PostgreSQL Source Code git master
basebackup_zstd.c
Go to the documentation of this file.
1/*-------------------------------------------------------------------------
2 *
3 * basebackup_zstd.c
4 * Basebackup sink implementing zstd compression.
5 *
6 * Portions Copyright (c) 2010-2025, PostgreSQL Global Development Group
7 *
8 * IDENTIFICATION
9 * src/backend/backup/basebackup_zstd.c
10 *
11 *-------------------------------------------------------------------------
12 */
13#include "postgres.h"
14
15#ifdef USE_ZSTD
16#include <zstd.h>
17#endif
18
20
21#ifdef USE_ZSTD
22
23typedef struct bbsink_zstd
24{
25 /* Common information for all types of sink. */
26 bbsink base;
27
28 /* Compression options */
30
31 ZSTD_CCtx *cctx;
32 ZSTD_outBuffer zstd_outBuf;
33} bbsink_zstd;
34
35static void bbsink_zstd_begin_backup(bbsink *sink);
36static void bbsink_zstd_begin_archive(bbsink *sink, const char *archive_name);
37static void bbsink_zstd_archive_contents(bbsink *sink, size_t len);
38static void bbsink_zstd_manifest_contents(bbsink *sink, size_t len);
39static void bbsink_zstd_end_archive(bbsink *sink);
40static void bbsink_zstd_cleanup(bbsink *sink);
41static void bbsink_zstd_end_backup(bbsink *sink, XLogRecPtr endptr,
42 TimeLineID endtli);
43
44static const bbsink_ops bbsink_zstd_ops = {
45 .begin_backup = bbsink_zstd_begin_backup,
46 .begin_archive = bbsink_zstd_begin_archive,
47 .archive_contents = bbsink_zstd_archive_contents,
48 .end_archive = bbsink_zstd_end_archive,
49 .begin_manifest = bbsink_forward_begin_manifest,
50 .manifest_contents = bbsink_zstd_manifest_contents,
51 .end_manifest = bbsink_forward_end_manifest,
52 .end_backup = bbsink_zstd_end_backup,
53 .cleanup = bbsink_zstd_cleanup
54};
55#endif
56
57/*
58 * Create a new basebackup sink that performs zstd compression.
59 */
60bbsink *
62{
63#ifndef USE_ZSTD
65 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
66 errmsg("zstd compression is not supported by this build")));
67 return NULL; /* keep compiler quiet */
68#else
69 bbsink_zstd *sink;
70
71 Assert(next != NULL);
72
73 sink = palloc0(sizeof(bbsink_zstd));
74 *((const bbsink_ops **) &sink->base.bbs_ops) = &bbsink_zstd_ops;
75 sink->base.bbs_next = next;
76 sink->compress = compress;
77
78 return &sink->base;
79#endif
80}
81
82#ifdef USE_ZSTD
83
84/*
85 * Begin backup.
86 */
87static void
88bbsink_zstd_begin_backup(bbsink *sink)
89{
90 bbsink_zstd *mysink = (bbsink_zstd *) sink;
91 size_t output_buffer_bound;
92 size_t ret;
93 pg_compress_specification *compress = mysink->compress;
94
95 mysink->cctx = ZSTD_createCCtx();
96 if (!mysink->cctx)
97 elog(ERROR, "could not create zstd compression context");
98
99 ret = ZSTD_CCtx_setParameter(mysink->cctx, ZSTD_c_compressionLevel,
100 compress->level);
101 if (ZSTD_isError(ret))
102 elog(ERROR, "could not set zstd compression level to %d: %s",
103 compress->level, ZSTD_getErrorName(ret));
104
105 if ((compress->options & PG_COMPRESSION_OPTION_WORKERS) != 0)
106 {
107 /*
108 * On older versions of libzstd, this option does not exist, and
109 * trying to set it will fail. Similarly for newer versions if they
110 * are compiled without threading support.
111 */
112 ret = ZSTD_CCtx_setParameter(mysink->cctx, ZSTD_c_nbWorkers,
113 compress->workers);
114 if (ZSTD_isError(ret))
116 errcode(ERRCODE_INVALID_PARAMETER_VALUE),
117 errmsg("could not set compression worker count to %d: %s",
118 compress->workers, ZSTD_getErrorName(ret)));
119 }
120
121 if ((compress->options & PG_COMPRESSION_OPTION_LONG_DISTANCE) != 0)
122 {
123 ret = ZSTD_CCtx_setParameter(mysink->cctx,
124 ZSTD_c_enableLongDistanceMatching,
125 compress->long_distance);
126 if (ZSTD_isError(ret))
128 errcode(ERRCODE_INVALID_PARAMETER_VALUE),
129 errmsg("could not enable long-distance mode: %s",
130 ZSTD_getErrorName(ret)));
131 }
132
133 /*
134 * We need our own buffer, because we're going to pass different data to
135 * the next sink than what gets passed to us.
136 */
137 mysink->base.bbs_buffer = palloc(mysink->base.bbs_buffer_length);
138
139 /*
140 * Make sure that the next sink's bbs_buffer is big enough to accommodate
141 * the compressed input buffer.
142 */
143 output_buffer_bound = ZSTD_compressBound(mysink->base.bbs_buffer_length);
144
145 /*
146 * The buffer length is expected to be a multiple of BLCKSZ, so round up.
147 */
148 output_buffer_bound = output_buffer_bound + BLCKSZ -
149 (output_buffer_bound % BLCKSZ);
150
151 bbsink_begin_backup(sink->bbs_next, sink->bbs_state, output_buffer_bound);
152}
153
154/*
155 * Prepare to compress the next archive.
156 */
157static void
158bbsink_zstd_begin_archive(bbsink *sink, const char *archive_name)
159{
160 bbsink_zstd *mysink = (bbsink_zstd *) sink;
161 char *zstd_archive_name;
162
163 /*
164 * At the start of each archive we reset the state to start a new
165 * compression operation. The parameters are sticky and they will stick
166 * around as we are resetting with option ZSTD_reset_session_only.
167 */
168 ZSTD_CCtx_reset(mysink->cctx, ZSTD_reset_session_only);
169
170 mysink->zstd_outBuf.dst = mysink->base.bbs_next->bbs_buffer;
171 mysink->zstd_outBuf.size = mysink->base.bbs_next->bbs_buffer_length;
172 mysink->zstd_outBuf.pos = 0;
173
174 /* Add ".zst" to the archive name. */
175 zstd_archive_name = psprintf("%s.zst", archive_name);
176 Assert(sink->bbs_next != NULL);
177 bbsink_begin_archive(sink->bbs_next, zstd_archive_name);
178 pfree(zstd_archive_name);
179}
180
181/*
182 * Compress the input data to the output buffer until we run out of input
183 * data. Each time the output buffer falls below the compression bound for
184 * the input buffer, invoke the archive_contents() method for the next sink.
185 *
186 * Note that since we're compressing the input, it may very commonly happen
187 * that we consume all the input data without filling the output buffer. In
188 * that case, the compressed representation of the current input data won't
189 * actually be sent to the next bbsink until a later call to this function,
190 * or perhaps even not until bbsink_zstd_end_archive() is invoked.
191 */
192static void
193bbsink_zstd_archive_contents(bbsink *sink, size_t len)
194{
195 bbsink_zstd *mysink = (bbsink_zstd *) sink;
196 ZSTD_inBuffer inBuf = {mysink->base.bbs_buffer, len, 0};
197
198 while (inBuf.pos < inBuf.size)
199 {
200 size_t yet_to_flush;
201 size_t max_needed = ZSTD_compressBound(inBuf.size - inBuf.pos);
202
203 /*
204 * If the out buffer is not left with enough space, send the output
205 * buffer to the next sink, and reset it.
206 */
207 if (mysink->zstd_outBuf.size - mysink->zstd_outBuf.pos < max_needed)
208 {
209 bbsink_archive_contents(mysink->base.bbs_next,
210 mysink->zstd_outBuf.pos);
211 mysink->zstd_outBuf.dst = mysink->base.bbs_next->bbs_buffer;
212 mysink->zstd_outBuf.size =
213 mysink->base.bbs_next->bbs_buffer_length;
214 mysink->zstd_outBuf.pos = 0;
215 }
216
217 yet_to_flush = ZSTD_compressStream2(mysink->cctx, &mysink->zstd_outBuf,
218 &inBuf, ZSTD_e_continue);
219
220 if (ZSTD_isError(yet_to_flush))
221 elog(ERROR,
222 "could not compress data: %s",
223 ZSTD_getErrorName(yet_to_flush));
224 }
225}
226
227/*
228 * There might be some data inside zstd's internal buffers; we need to get that
229 * flushed out, also end the zstd frame and then get that forwarded to the
230 * successor sink as archive content.
231 *
232 * Then we can end processing for this archive.
233 */
234static void
235bbsink_zstd_end_archive(bbsink *sink)
236{
237 bbsink_zstd *mysink = (bbsink_zstd *) sink;
238 size_t yet_to_flush;
239
240 do
241 {
242 ZSTD_inBuffer in = {NULL, 0, 0};
243 size_t max_needed = ZSTD_compressBound(0);
244
245 /*
246 * If the out buffer is not left with enough space, send the output
247 * buffer to the next sink, and reset it.
248 */
249 if (mysink->zstd_outBuf.size - mysink->zstd_outBuf.pos < max_needed)
250 {
251 bbsink_archive_contents(mysink->base.bbs_next,
252 mysink->zstd_outBuf.pos);
253 mysink->zstd_outBuf.dst = mysink->base.bbs_next->bbs_buffer;
254 mysink->zstd_outBuf.size =
255 mysink->base.bbs_next->bbs_buffer_length;
256 mysink->zstd_outBuf.pos = 0;
257 }
258
259 yet_to_flush = ZSTD_compressStream2(mysink->cctx,
260 &mysink->zstd_outBuf,
261 &in, ZSTD_e_end);
262
263 if (ZSTD_isError(yet_to_flush))
264 elog(ERROR, "could not compress data: %s",
265 ZSTD_getErrorName(yet_to_flush));
266
267 } while (yet_to_flush > 0);
268
269 /* Make sure to pass any remaining bytes to the next sink. */
270 if (mysink->zstd_outBuf.pos > 0)
271 bbsink_archive_contents(mysink->base.bbs_next,
272 mysink->zstd_outBuf.pos);
273
274 /* Pass on the information that this archive has ended. */
276}
277
278/*
279 * Free the resources and context.
280 */
281static void
282bbsink_zstd_end_backup(bbsink *sink, XLogRecPtr endptr,
283 TimeLineID endtli)
284{
285 bbsink_zstd *mysink = (bbsink_zstd *) sink;
286
287 /* Release the context. */
288 if (mysink->cctx)
289 {
290 ZSTD_freeCCtx(mysink->cctx);
291 mysink->cctx = NULL;
292 }
293
294 bbsink_forward_end_backup(sink, endptr, endtli);
295}
296
297/*
298 * Manifest contents are not compressed, but we do need to copy them into
299 * the successor sink's buffer, because we have our own.
300 */
301static void
302bbsink_zstd_manifest_contents(bbsink *sink, size_t len)
303{
304 memcpy(sink->bbs_next->bbs_buffer, sink->bbs_buffer, len);
306}
307
308/*
309 * In case the backup fails, make sure we free any compression context that
310 * got allocated, so that we don't leak memory.
311 */
312static void
313bbsink_zstd_cleanup(bbsink *sink)
314{
315 bbsink_zstd *mysink = (bbsink_zstd *) sink;
316
317 /* Release the context if not already released. */
318 if (mysink->cctx)
319 {
320 ZSTD_freeCCtx(mysink->cctx);
321 mysink->cctx = NULL;
322 }
323}
324
325#endif
void bbsink_forward_begin_manifest(bbsink *sink)
void bbsink_forward_end_backup(bbsink *sink, XLogRecPtr endptr, TimeLineID endtli)
void bbsink_forward_end_archive(bbsink *sink)
void bbsink_forward_end_manifest(bbsink *sink)
static void bbsink_begin_backup(bbsink *sink, bbsink_state *state, int buffer_length)
static void bbsink_begin_archive(bbsink *sink, const char *archive_name)
static void bbsink_archive_contents(bbsink *sink, size_t len)
static void bbsink_manifest_contents(bbsink *sink, size_t len)
bbsink * bbsink_zstd_new(bbsink *next, pg_compress_specification *compress)
static int32 next
Definition: blutils.c:219
#define Assert(condition)
Definition: c.h:812
#define PG_COMPRESSION_OPTION_WORKERS
Definition: compression.h:29
#define PG_COMPRESSION_OPTION_LONG_DISTANCE
Definition: compression.h:30
int errcode(int sqlerrcode)
Definition: elog.c:853
int errmsg(const char *fmt,...)
Definition: elog.c:1070
#define ERROR
Definition: elog.h:39
#define elog(elevel,...)
Definition: elog.h:225
#define ereport(elevel,...)
Definition: elog.h:149
void pfree(void *pointer)
Definition: mcxt.c:1521
void * palloc0(Size size)
Definition: mcxt.c:1347
void * palloc(Size size)
Definition: mcxt.c:1317
const void size_t len
char * psprintf(const char *fmt,...)
Definition: psprintf.c:43
void(* begin_backup)(bbsink *sink)
bbsink * bbs_next
bbsink_state * bbs_state
char * bbs_buffer
uint64 XLogRecPtr
Definition: xlogdefs.h:21
uint32 TimeLineID
Definition: xlogdefs.h:59