PostgreSQL Source Code git master
Loading...
Searching...
No Matches
basebackup_gzip.c
Go to the documentation of this file.
1/*-------------------------------------------------------------------------
2 *
3 * basebackup_gzip.c
4 * Basebackup sink implementing gzip compression.
5 *
6 * Portions Copyright (c) 2010-2026, PostgreSQL Global Development Group
7 *
8 * IDENTIFICATION
9 * src/backend/backup/basebackup_gzip.c
10 *
11 *-------------------------------------------------------------------------
12 */
13#include "postgres.h"
14
15#ifdef HAVE_LIBZ
16#include <zlib.h>
17#endif
18
20
21#ifdef HAVE_LIBZ
22typedef struct bbsink_gzip
23{
24 /* Common information for all types of sink. */
25 bbsink base;
26
27 /* Compression level. */
28 int compresslevel;
29
30 /* Compressed data stream. */
32
33 /* Number of bytes staged in output buffer. */
34 size_t bytes_written;
35
36 /* Has the zstream been initialized? */
39
41static void bbsink_gzip_begin_archive(bbsink *sink, const char *archive_name);
42static void bbsink_gzip_archive_contents(bbsink *sink, size_t len);
43static void bbsink_gzip_manifest_contents(bbsink *sink, size_t len);
45static void bbsink_gzip_cleanup(bbsink *sink);
46static void *gzip_palloc(void *opaque, unsigned items, unsigned size);
47static void gzip_pfree(void *opaque, void *address);
48
49static const bbsink_ops bbsink_gzip_ops = {
51 .begin_archive = bbsink_gzip_begin_archive,
52 .archive_contents = bbsink_gzip_archive_contents,
53 .end_archive = bbsink_gzip_end_archive,
54 .begin_manifest = bbsink_forward_begin_manifest,
55 .manifest_contents = bbsink_gzip_manifest_contents,
56 .end_manifest = bbsink_forward_end_manifest,
57 .end_backup = bbsink_forward_end_backup,
58 .cleanup = bbsink_gzip_cleanup
59};
60#endif
61
62/*
63 * Create a new basebackup sink that performs gzip compression.
64 */
65bbsink *
67{
68#ifndef HAVE_LIBZ
71 errmsg("gzip compression is not supported by this build")));
72 return NULL; /* keep compiler quiet */
73#else
75 int compresslevel;
76
77 Assert(next != NULL);
78
79 compresslevel = compress->level;
80 Assert((compresslevel >= 1 && compresslevel <= 9) ||
82
84 *((const bbsink_ops **) &sink->base.bbs_ops) = &bbsink_gzip_ops;
85 sink->base.bbs_next = next;
86 sink->compresslevel = compresslevel;
87
88 return &sink->base;
89#endif
90}
91
92#ifdef HAVE_LIBZ
93
94/*
95 * Begin backup.
96 */
97static void
99{
100 /*
101 * We need our own buffer, because we're going to pass different data to
102 * the next sink than what gets passed to us.
103 */
104 sink->bbs_buffer = palloc(sink->bbs_buffer_length);
105
106 /*
107 * Since deflate() doesn't require the output buffer to be of any
108 * particular size, we can just make it the same size as the input buffer.
109 */
110 bbsink_begin_backup(sink->bbs_next, sink->bbs_state,
111 sink->bbs_buffer_length);
112}
113
114/*
115 * Prepare to compress the next archive.
116 */
117static void
118bbsink_gzip_begin_archive(bbsink *sink, const char *archive_name)
119{
121 char *gz_archive_name;
122 z_stream *zs = &mysink->zstream;
123
124 /* Initialize compressor object. */
125 memset(zs, 0, sizeof(z_stream));
126 zs->zalloc = gzip_palloc;
127 zs->zfree = gzip_pfree;
128 zs->next_out = (uint8 *) sink->bbs_next->bbs_buffer;
129 zs->avail_out = sink->bbs_next->bbs_buffer_length;
130
131 /*
132 * We need to use deflateInit2() rather than deflateInit() here so that we
133 * can request a gzip header rather than a zlib header. Otherwise, we want
134 * to supply the same values that would have been used by default if we
135 * had just called deflateInit().
136 *
137 * Per the documentation for deflateInit2, the third argument must be
138 * Z_DEFLATED; the fourth argument is the number of "window bits", by
139 * default 15, but adding 16 gets you a gzip header rather than a zlib
140 * header; the fifth argument controls memory usage, and 8 is the default;
141 * and likewise Z_DEFAULT_STRATEGY is the default for the sixth argument.
142 */
143 if (deflateInit2(zs, mysink->compresslevel, Z_DEFLATED, 15 + 16, 8,
147 errmsg("could not initialize compression library"));
148 mysink->zstream_initialized = true;
149
150 /*
151 * Add ".gz" to the archive name. Note that the pg_basebackup -z produces
152 * archives named ".tar.gz" rather than ".tgz", so we match that here.
153 */
154 gz_archive_name = psprintf("%s.gz", archive_name);
155 Assert(sink->bbs_next != NULL);
158}
159
160/*
161 * Compress the input data to the output buffer until we run out of input
162 * data. Each time the output buffer fills up, invoke the archive_contents()
163 * method for then next sink.
164 *
165 * Note that since we're compressing the input, it may very commonly happen
166 * that we consume all the input data without filling the output buffer. In
167 * that case, the compressed representation of the current input data won't
168 * actually be sent to the next bbsink until a later call to this function,
169 * or perhaps even not until bbsink_gzip_end_archive() is invoked.
170 */
171static void
173{
175 z_stream *zs = &mysink->zstream;
176
177 /* Compress data from input buffer. */
178 zs->next_in = (uint8 *) mysink->base.bbs_buffer;
179 zs->avail_in = len;
180
181 while (zs->avail_in > 0)
182 {
183 int res;
184
185 /* Write output data into unused portion of output buffer. */
186 Assert(mysink->bytes_written < mysink->base.bbs_next->bbs_buffer_length);
187 zs->next_out = (uint8 *)
188 mysink->base.bbs_next->bbs_buffer + mysink->bytes_written;
189 zs->avail_out =
190 mysink->base.bbs_next->bbs_buffer_length - mysink->bytes_written;
191
192 /*
193 * Try to compress. Note that this will update zs->next_in and
194 * zs->avail_in according to how much input data was consumed, and
195 * zs->next_out and zs->avail_out according to how many output bytes
196 * were produced.
197 *
198 * According to the zlib documentation, Z_STREAM_ERROR should only
199 * occur if we've made a programming error, or if say there's been a
200 * memory clobber; we use elog() rather than Assert() here out of an
201 * abundance of caution.
202 */
203 res = deflate(zs, Z_NO_FLUSH);
204 if (res == Z_STREAM_ERROR)
205 elog(ERROR, "could not compress data: %s", zs->msg);
206
207 /* Update our notion of how many bytes we've written. */
208 mysink->bytes_written =
209 mysink->base.bbs_next->bbs_buffer_length - zs->avail_out;
210
211 /*
212 * If the output buffer is full, it's time for the next sink to
213 * process the contents.
214 */
215 if (mysink->bytes_written >= mysink->base.bbs_next->bbs_buffer_length)
216 {
217 bbsink_archive_contents(sink->bbs_next, mysink->bytes_written);
218 mysink->bytes_written = 0;
219 }
220 }
221}
222
223/*
224 * There might be some data inside zlib's internal buffers; we need to get
225 * that flushed out and forwarded to the successor sink as archive content.
226 *
227 * Then we can end processing for this archive.
228 */
229static void
231{
233 z_stream *zs = &mysink->zstream;
234
235 /* There is no more data available. */
236 zs->next_in = (uint8 *) mysink->base.bbs_buffer;
237 zs->avail_in = 0;
238
239 while (1)
240 {
241 int res;
242
243 /* Write output data into unused portion of output buffer. */
244 Assert(mysink->bytes_written < mysink->base.bbs_next->bbs_buffer_length);
245 zs->next_out = (uint8 *)
246 mysink->base.bbs_next->bbs_buffer + mysink->bytes_written;
247 zs->avail_out =
248 mysink->base.bbs_next->bbs_buffer_length - mysink->bytes_written;
249
250 /*
251 * As bbsink_gzip_archive_contents, but pass Z_FINISH since there is
252 * no more input.
253 */
254 res = deflate(zs, Z_FINISH);
255 if (res == Z_STREAM_ERROR)
256 elog(ERROR, "could not compress data: %s", zs->msg);
257
258 /* Update our notion of how many bytes we've written. */
259 mysink->bytes_written =
260 mysink->base.bbs_next->bbs_buffer_length - zs->avail_out;
261
262 /*
263 * Apparently we had no data in the output buffer and deflate() was
264 * not able to add any. We must be done.
265 */
266 if (mysink->bytes_written == 0)
267 break;
268
269 /* Send whatever accumulated output bytes we have. */
270 bbsink_archive_contents(sink->bbs_next, mysink->bytes_written);
271 mysink->bytes_written = 0;
272 }
273
274 /* Release the compression resources. */
275 deflateEnd(zs);
276 mysink->zstream_initialized = false;
277
278 /* Must also pass on the information that this archive has ended. */
280}
281
282/*
283 * Manifest contents are not compressed, but we do need to copy them into
284 * the successor sink's buffer, because we have our own.
285 */
286static void
288{
289 memcpy(sink->bbs_next->bbs_buffer, sink->bbs_buffer, len);
291}
292
293/*
294 * Wrapper function to adjust the signature of palloc to match what libz
295 * expects.
296 */
297static void *
298gzip_palloc(void *opaque, unsigned items, unsigned size)
299{
300 return palloc(items * size);
301}
302
303/*
304 * Wrapper function to adjust the signature of pfree to match what libz
305 * expects.
306 */
307static void
308gzip_pfree(void *opaque, void *address)
309{
310 pfree(address);
311}
312
313/*
314 * In case the backup fails, make sure we free the compression context by
315 * calling deflateEnd() if needed to avoid a resource leak.
316 */
317static void
319{
321
322 if (mysink->zstream_initialized)
323 {
324 deflateEnd(&mysink->zstream);
325 mysink->zstream_initialized = false;
326 }
327}
328
329#endif
bbsink * bbsink_gzip_new(bbsink *next, pg_compress_specification *compress)
void bbsink_forward_begin_manifest(bbsink *sink)
void bbsink_forward_end_backup(bbsink *sink, XLogRecPtr endptr, TimeLineID endtli)
void bbsink_forward_end_archive(bbsink *sink)
void bbsink_forward_end_manifest(bbsink *sink)
static void bbsink_begin_backup(bbsink *sink, bbsink_state *state, int buffer_length)
static void bbsink_begin_archive(bbsink *sink, const char *archive_name)
static void bbsink_archive_contents(bbsink *sink, size_t len)
static void bbsink_manifest_contents(bbsink *sink, size_t len)
static int32 next
Definition blutils.c:225
uint8_t uint8
Definition c.h:616
#define Assert(condition)
Definition c.h:945
int errcode(int sqlerrcode)
Definition elog.c:874
#define ERROR
Definition elog.h:39
#define elog(elevel,...)
Definition elog.h:226
#define ereport(elevel,...)
Definition elog.h:150
#define palloc0_object(type)
Definition fe_memutils.h:75
void pfree(void *pointer)
Definition mcxt.c:1616
void * palloc(Size size)
Definition mcxt.c:1387
static char * errmsg
const void size_t len
static int compresslevel
static int fb(int x)
char * psprintf(const char *fmt,...)
Definition psprintf.c:43
void(* begin_backup)(bbsink *sink)
static ItemArray items