PostgreSQL Source Code  git master
basebackup_zstd.c
Go to the documentation of this file.
1 /*-------------------------------------------------------------------------
2  *
3  * basebackup_zstd.c
4  * Basebackup sink implementing zstd compression.
5  *
6  * Portions Copyright (c) 2010-2022, PostgreSQL Global Development Group
7  *
8  * IDENTIFICATION
9  * src/backend/replication/basebackup_zstd.c
10  *
11  *-------------------------------------------------------------------------
12  */
13 #include "postgres.h"
14 
15 #ifdef USE_ZSTD
16 #include <zstd.h>
17 #endif
18 
20 
21 #ifdef USE_ZSTD
22 
23 typedef struct bbsink_zstd
24 {
25  /* Common information for all types of sink. */
26  bbsink base;
27 
28  /* Compression options */
29  pg_compress_specification *compress;
30 
31  ZSTD_CCtx *cctx;
32  ZSTD_outBuffer zstd_outBuf;
33 } bbsink_zstd;
34 
35 static void bbsink_zstd_begin_backup(bbsink *sink);
36 static void bbsink_zstd_begin_archive(bbsink *sink, const char *archive_name);
37 static void bbsink_zstd_archive_contents(bbsink *sink, size_t avail_in);
38 static void bbsink_zstd_manifest_contents(bbsink *sink, size_t len);
39 static void bbsink_zstd_end_archive(bbsink *sink);
40 static void bbsink_zstd_cleanup(bbsink *sink);
41 static void bbsink_zstd_end_backup(bbsink *sink, XLogRecPtr endptr,
42  TimeLineID endtli);
43 
44 static const bbsink_ops bbsink_zstd_ops = {
45  .begin_backup = bbsink_zstd_begin_backup,
46  .begin_archive = bbsink_zstd_begin_archive,
47  .archive_contents = bbsink_zstd_archive_contents,
48  .end_archive = bbsink_zstd_end_archive,
49  .begin_manifest = bbsink_forward_begin_manifest,
50  .manifest_contents = bbsink_zstd_manifest_contents,
51  .end_manifest = bbsink_forward_end_manifest,
52  .end_backup = bbsink_zstd_end_backup,
53  .cleanup = bbsink_zstd_cleanup
54 };
55 #endif
56 
57 /*
58  * Create a new basebackup sink that performs zstd compression.
59  */
60 bbsink *
62 {
63 #ifndef USE_ZSTD
64  ereport(ERROR,
65  (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
66  errmsg("zstd compression is not supported by this build")));
67  return NULL; /* keep compiler quiet */
68 #else
69  bbsink_zstd *sink;
70 
71  Assert(next != NULL);
72 
73  sink = palloc0(sizeof(bbsink_zstd));
74  *((const bbsink_ops **) &sink->base.bbs_ops) = &bbsink_zstd_ops;
75  sink->base.bbs_next = next;
76  sink->compress = compress;
77 
78  return &sink->base;
79 #endif
80 }
81 
82 #ifdef USE_ZSTD
83 
84 /*
85  * Begin backup.
86  */
87 static void
88 bbsink_zstd_begin_backup(bbsink *sink)
89 {
90  bbsink_zstd *mysink = (bbsink_zstd *) sink;
91  size_t output_buffer_bound;
92  size_t ret;
93  pg_compress_specification *compress = mysink->compress;
94 
95  mysink->cctx = ZSTD_createCCtx();
96  if (!mysink->cctx)
97  elog(ERROR, "could not create zstd compression context");
98 
99  if ((compress->options & PG_COMPRESSION_OPTION_LEVEL) != 0)
100  {
101  ret = ZSTD_CCtx_setParameter(mysink->cctx, ZSTD_c_compressionLevel,
102  compress->level);
103  if (ZSTD_isError(ret))
104  elog(ERROR, "could not set zstd compression level to %d: %s",
105  compress->level, ZSTD_getErrorName(ret));
106  }
107 
108  if ((compress->options & PG_COMPRESSION_OPTION_WORKERS) != 0)
109  {
110  /*
111  * On older versions of libzstd, this option does not exist, and
112  * trying to set it will fail. Similarly for newer versions if they
113  * are compiled without threading support.
114  */
115  ret = ZSTD_CCtx_setParameter(mysink->cctx, ZSTD_c_nbWorkers,
116  compress->workers);
117  if (ZSTD_isError(ret))
118  ereport(ERROR,
119  errcode(ERRCODE_INVALID_PARAMETER_VALUE),
120  errmsg("could not set compression worker count to %d: %s",
121  compress->workers, ZSTD_getErrorName(ret)));
122  }
123 
124  /*
125  * We need our own buffer, because we're going to pass different data to
126  * the next sink than what gets passed to us.
127  */
128  mysink->base.bbs_buffer = palloc(mysink->base.bbs_buffer_length);
129 
130  /*
131  * Make sure that the next sink's bbs_buffer is big enough to accommodate
132  * the compressed input buffer.
133  */
134  output_buffer_bound = ZSTD_compressBound(mysink->base.bbs_buffer_length);
135 
136  /*
137  * The buffer length is expected to be a multiple of BLCKSZ, so round up.
138  */
139  output_buffer_bound = output_buffer_bound + BLCKSZ -
140  (output_buffer_bound % BLCKSZ);
141 
142  bbsink_begin_backup(sink->bbs_next, sink->bbs_state, output_buffer_bound);
143 }
144 
145 /*
146  * Prepare to compress the next archive.
147  */
148 static void
149 bbsink_zstd_begin_archive(bbsink *sink, const char *archive_name)
150 {
151  bbsink_zstd *mysink = (bbsink_zstd *) sink;
152  char *zstd_archive_name;
153 
154  /*
155  * At the start of each archive we reset the state to start a new
156  * compression operation. The parameters are sticky and they will stick
157  * around as we are resetting with option ZSTD_reset_session_only.
158  */
159  ZSTD_CCtx_reset(mysink->cctx, ZSTD_reset_session_only);
160 
161  mysink->zstd_outBuf.dst = mysink->base.bbs_next->bbs_buffer;
162  mysink->zstd_outBuf.size = mysink->base.bbs_next->bbs_buffer_length;
163  mysink->zstd_outBuf.pos = 0;
164 
165  /* Add ".zst" to the archive name. */
166  zstd_archive_name = psprintf("%s.zst", archive_name);
167  Assert(sink->bbs_next != NULL);
168  bbsink_begin_archive(sink->bbs_next, zstd_archive_name);
169  pfree(zstd_archive_name);
170 }
171 
172 /*
173  * Compress the input data to the output buffer until we run out of input
174  * data. Each time the output buffer falls below the compression bound for
175  * the input buffer, invoke the archive_contents() method for the next sink.
176  *
177  * Note that since we're compressing the input, it may very commonly happen
178  * that we consume all the input data without filling the output buffer. In
179  * that case, the compressed representation of the current input data won't
180  * actually be sent to the next bbsink until a later call to this function,
181  * or perhaps even not until bbsink_zstd_end_archive() is invoked.
182  */
183 static void
184 bbsink_zstd_archive_contents(bbsink *sink, size_t len)
185 {
186  bbsink_zstd *mysink = (bbsink_zstd *) sink;
187  ZSTD_inBuffer inBuf = {mysink->base.bbs_buffer, len, 0};
188 
189  while (inBuf.pos < inBuf.size)
190  {
191  size_t yet_to_flush;
192  size_t max_needed = ZSTD_compressBound(inBuf.size - inBuf.pos);
193 
194  /*
195  * If the out buffer is not left with enough space, send the output
196  * buffer to the next sink, and reset it.
197  */
198  if (mysink->zstd_outBuf.size - mysink->zstd_outBuf.pos < max_needed)
199  {
200  bbsink_archive_contents(mysink->base.bbs_next,
201  mysink->zstd_outBuf.pos);
202  mysink->zstd_outBuf.dst = mysink->base.bbs_next->bbs_buffer;
203  mysink->zstd_outBuf.size =
204  mysink->base.bbs_next->bbs_buffer_length;
205  mysink->zstd_outBuf.pos = 0;
206  }
207 
208  yet_to_flush = ZSTD_compressStream2(mysink->cctx, &mysink->zstd_outBuf,
209  &inBuf, ZSTD_e_continue);
210 
211  if (ZSTD_isError(yet_to_flush))
212  elog(ERROR,
213  "could not compress data: %s",
214  ZSTD_getErrorName(yet_to_flush));
215  }
216 }
217 
218 /*
219  * There might be some data inside zstd's internal buffers; we need to get that
220  * flushed out, also end the zstd frame and then get that forwarded to the
221  * successor sink as archive content.
222  *
223  * Then we can end processing for this archive.
224  */
225 static void
226 bbsink_zstd_end_archive(bbsink *sink)
227 {
228  bbsink_zstd *mysink = (bbsink_zstd *) sink;
229  size_t yet_to_flush;
230 
231  do
232  {
233  ZSTD_inBuffer in = {NULL, 0, 0};
234  size_t max_needed = ZSTD_compressBound(0);
235 
236  /*
237  * If the out buffer is not left with enough space, send the output
238  * buffer to the next sink, and reset it.
239  */
240  if (mysink->zstd_outBuf.size - mysink->zstd_outBuf.pos < max_needed)
241  {
242  bbsink_archive_contents(mysink->base.bbs_next,
243  mysink->zstd_outBuf.pos);
244  mysink->zstd_outBuf.dst = mysink->base.bbs_next->bbs_buffer;
245  mysink->zstd_outBuf.size =
246  mysink->base.bbs_next->bbs_buffer_length;
247  mysink->zstd_outBuf.pos = 0;
248  }
249 
250  yet_to_flush = ZSTD_compressStream2(mysink->cctx,
251  &mysink->zstd_outBuf,
252  &in, ZSTD_e_end);
253 
254  if (ZSTD_isError(yet_to_flush))
255  elog(ERROR, "could not compress data: %s",
256  ZSTD_getErrorName(yet_to_flush));
257 
258  } while (yet_to_flush > 0);
259 
260  /* Make sure to pass any remaining bytes to the next sink. */
261  if (mysink->zstd_outBuf.pos > 0)
262  bbsink_archive_contents(mysink->base.bbs_next,
263  mysink->zstd_outBuf.pos);
264 
265  /* Pass on the information that this archive has ended. */
267 }
268 
269 /*
270  * Free the resources and context.
271  */
272 static void
273 bbsink_zstd_end_backup(bbsink *sink, XLogRecPtr endptr,
274  TimeLineID endtli)
275 {
276  bbsink_zstd *mysink = (bbsink_zstd *) sink;
277 
278  /* Release the context. */
279  if (mysink->cctx)
280  {
281  ZSTD_freeCCtx(mysink->cctx);
282  mysink->cctx = NULL;
283  }
284 
285  bbsink_forward_end_backup(sink, endptr, endtli);
286 }
287 
288 /*
289  * Manifest contents are not compressed, but we do need to copy them into
290  * the successor sink's buffer, because we have our own.
291  */
292 static void
293 bbsink_zstd_manifest_contents(bbsink *sink, size_t len)
294 {
295  memcpy(sink->bbs_next->bbs_buffer, sink->bbs_buffer, len);
297 }
298 
299 /*
300  * In case the backup fails, make sure we free any compression context that
301  * got allocated, so that we don't leak memory.
302  */
303 static void
304 bbsink_zstd_cleanup(bbsink *sink)
305 {
306  bbsink_zstd *mysink = (bbsink_zstd *) sink;
307 
308  /* Release the context if not already released. */
309  if (mysink->cctx)
310  {
311  ZSTD_freeCCtx(mysink->cctx);
312  mysink->cctx = NULL;
313  }
314 }
315 
316 #endif
void bbsink_forward_begin_manifest(bbsink *sink)
void bbsink_forward_end_backup(bbsink *sink, XLogRecPtr endptr, TimeLineID endtli)
void bbsink_forward_end_archive(bbsink *sink)
void bbsink_forward_end_manifest(bbsink *sink)
static void bbsink_begin_backup(bbsink *sink, bbsink_state *state, int buffer_length)
static void bbsink_begin_archive(bbsink *sink, const char *archive_name)
static void bbsink_archive_contents(bbsink *sink, size_t len)
static void bbsink_manifest_contents(bbsink *sink, size_t len)
bbsink * bbsink_zstd_new(bbsink *next, pg_compress_specification *compress)
static int32 next
Definition: blutils.c:219
#define PG_COMPRESSION_OPTION_LEVEL
Definition: compression.h:25
#define PG_COMPRESSION_OPTION_WORKERS
Definition: compression.h:26
int errcode(int sqlerrcode)
Definition: elog.c:693
int errmsg(const char *fmt,...)
Definition: elog.c:904
#define ERROR
Definition: elog.h:35
#define ereport(elevel,...)
Definition: elog.h:145
Assert(fmt[strlen(fmt) - 1] !='\n')
void pfree(void *pointer)
Definition: mcxt.c:1175
void * palloc0(Size size)
Definition: mcxt.c:1099
void * palloc(Size size)
Definition: mcxt.c:1068
const void size_t len
char * psprintf(const char *fmt,...)
Definition: psprintf.c:46
void(* begin_backup)(bbsink *sink)
bbsink * bbs_next
bbsink_state * bbs_state
char * bbs_buffer
uint64 XLogRecPtr
Definition: xlogdefs.h:21
uint32 TimeLineID
Definition: xlogdefs.h:59