PostgreSQL Source Code git master
basebackup_gzip.c
Go to the documentation of this file.
1/*-------------------------------------------------------------------------
2 *
3 * basebackup_gzip.c
4 * Basebackup sink implementing gzip compression.
5 *
6 * Portions Copyright (c) 2010-2025, PostgreSQL Global Development Group
7 *
8 * IDENTIFICATION
9 * src/backend/backup/basebackup_gzip.c
10 *
11 *-------------------------------------------------------------------------
12 */
13#include "postgres.h"
14
15#ifdef HAVE_LIBZ
16#include <zlib.h>
17#endif
18
20
21#ifdef HAVE_LIBZ
22typedef struct bbsink_gzip
23{
24 /* Common information for all types of sink. */
25 bbsink base;
26
27 /* Compression level. */
28 int compresslevel;
29
30 /* Compressed data stream. */
31 z_stream zstream;
32
33 /* Number of bytes staged in output buffer. */
34 size_t bytes_written;
35} bbsink_gzip;
36
37static void bbsink_gzip_begin_backup(bbsink *sink);
38static void bbsink_gzip_begin_archive(bbsink *sink, const char *archive_name);
39static void bbsink_gzip_archive_contents(bbsink *sink, size_t len);
40static void bbsink_gzip_manifest_contents(bbsink *sink, size_t len);
41static void bbsink_gzip_end_archive(bbsink *sink);
42static void *gzip_palloc(void *opaque, unsigned items, unsigned size);
43static void gzip_pfree(void *opaque, void *address);
44
45static const bbsink_ops bbsink_gzip_ops = {
46 .begin_backup = bbsink_gzip_begin_backup,
47 .begin_archive = bbsink_gzip_begin_archive,
48 .archive_contents = bbsink_gzip_archive_contents,
49 .end_archive = bbsink_gzip_end_archive,
50 .begin_manifest = bbsink_forward_begin_manifest,
51 .manifest_contents = bbsink_gzip_manifest_contents,
52 .end_manifest = bbsink_forward_end_manifest,
53 .end_backup = bbsink_forward_end_backup,
54 .cleanup = bbsink_forward_cleanup
55};
56#endif
57
58/*
59 * Create a new basebackup sink that performs gzip compression.
60 */
61bbsink *
63{
64#ifndef HAVE_LIBZ
66 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
67 errmsg("gzip compression is not supported by this build")));
68 return NULL; /* keep compiler quiet */
69#else
70 bbsink_gzip *sink;
71 int compresslevel;
72
73 Assert(next != NULL);
74
75 compresslevel = compress->level;
76 Assert((compresslevel >= 1 && compresslevel <= 9) ||
77 compresslevel == Z_DEFAULT_COMPRESSION);
78
79 sink = palloc0(sizeof(bbsink_gzip));
80 *((const bbsink_ops **) &sink->base.bbs_ops) = &bbsink_gzip_ops;
81 sink->base.bbs_next = next;
82 sink->compresslevel = compresslevel;
83
84 return &sink->base;
85#endif
86}
87
88#ifdef HAVE_LIBZ
89
90/*
91 * Begin backup.
92 */
93static void
94bbsink_gzip_begin_backup(bbsink *sink)
95{
96 /*
97 * We need our own buffer, because we're going to pass different data to
98 * the next sink than what gets passed to us.
99 */
100 sink->bbs_buffer = palloc(sink->bbs_buffer_length);
101
102 /*
103 * Since deflate() doesn't require the output buffer to be of any
104 * particular size, we can just make it the same size as the input buffer.
105 */
107 sink->bbs_buffer_length);
108}
109
110/*
111 * Prepare to compress the next archive.
112 */
113static void
114bbsink_gzip_begin_archive(bbsink *sink, const char *archive_name)
115{
116 bbsink_gzip *mysink = (bbsink_gzip *) sink;
117 char *gz_archive_name;
118 z_stream *zs = &mysink->zstream;
119
120 /* Initialize compressor object. */
121 memset(zs, 0, sizeof(z_stream));
122 zs->zalloc = gzip_palloc;
123 zs->zfree = gzip_pfree;
124 zs->next_out = (uint8 *) sink->bbs_next->bbs_buffer;
125 zs->avail_out = sink->bbs_next->bbs_buffer_length;
126
127 /*
128 * We need to use deflateInit2() rather than deflateInit() here so that we
129 * can request a gzip header rather than a zlib header. Otherwise, we want
130 * to supply the same values that would have been used by default if we
131 * had just called deflateInit().
132 *
133 * Per the documentation for deflateInit2, the third argument must be
134 * Z_DEFLATED; the fourth argument is the number of "window bits", by
135 * default 15, but adding 16 gets you a gzip header rather than a zlib
136 * header; the fifth argument controls memory usage, and 8 is the default;
137 * and likewise Z_DEFAULT_STRATEGY is the default for the sixth argument.
138 */
139 if (deflateInit2(zs, mysink->compresslevel, Z_DEFLATED, 15 + 16, 8,
140 Z_DEFAULT_STRATEGY) != Z_OK)
142 errcode(ERRCODE_INTERNAL_ERROR),
143 errmsg("could not initialize compression library"));
144
145 /*
146 * Add ".gz" to the archive name. Note that the pg_basebackup -z produces
147 * archives named ".tar.gz" rather than ".tgz", so we match that here.
148 */
149 gz_archive_name = psprintf("%s.gz", archive_name);
150 Assert(sink->bbs_next != NULL);
151 bbsink_begin_archive(sink->bbs_next, gz_archive_name);
152 pfree(gz_archive_name);
153}
154
155/*
156 * Compress the input data to the output buffer until we run out of input
157 * data. Each time the output buffer fills up, invoke the archive_contents()
158 * method for then next sink.
159 *
160 * Note that since we're compressing the input, it may very commonly happen
161 * that we consume all the input data without filling the output buffer. In
162 * that case, the compressed representation of the current input data won't
163 * actually be sent to the next bbsink until a later call to this function,
164 * or perhaps even not until bbsink_gzip_end_archive() is invoked.
165 */
166static void
167bbsink_gzip_archive_contents(bbsink *sink, size_t len)
168{
169 bbsink_gzip *mysink = (bbsink_gzip *) sink;
170 z_stream *zs = &mysink->zstream;
171
172 /* Compress data from input buffer. */
173 zs->next_in = (uint8 *) mysink->base.bbs_buffer;
174 zs->avail_in = len;
175
176 while (zs->avail_in > 0)
177 {
178 int res;
179
180 /* Write output data into unused portion of output buffer. */
181 Assert(mysink->bytes_written < mysink->base.bbs_next->bbs_buffer_length);
182 zs->next_out = (uint8 *)
183 mysink->base.bbs_next->bbs_buffer + mysink->bytes_written;
184 zs->avail_out =
185 mysink->base.bbs_next->bbs_buffer_length - mysink->bytes_written;
186
187 /*
188 * Try to compress. Note that this will update zs->next_in and
189 * zs->avail_in according to how much input data was consumed, and
190 * zs->next_out and zs->avail_out according to how many output bytes
191 * were produced.
192 *
193 * According to the zlib documentation, Z_STREAM_ERROR should only
194 * occur if we've made a programming error, or if say there's been a
195 * memory clobber; we use elog() rather than Assert() here out of an
196 * abundance of caution.
197 */
198 res = deflate(zs, Z_NO_FLUSH);
199 if (res == Z_STREAM_ERROR)
200 elog(ERROR, "could not compress data: %s", zs->msg);
201
202 /* Update our notion of how many bytes we've written. */
203 mysink->bytes_written =
204 mysink->base.bbs_next->bbs_buffer_length - zs->avail_out;
205
206 /*
207 * If the output buffer is full, it's time for the next sink to
208 * process the contents.
209 */
210 if (mysink->bytes_written >= mysink->base.bbs_next->bbs_buffer_length)
211 {
212 bbsink_archive_contents(sink->bbs_next, mysink->bytes_written);
213 mysink->bytes_written = 0;
214 }
215 }
216}
217
218/*
219 * There might be some data inside zlib's internal buffers; we need to get
220 * that flushed out and forwarded to the successor sink as archive content.
221 *
222 * Then we can end processing for this archive.
223 */
224static void
225bbsink_gzip_end_archive(bbsink *sink)
226{
227 bbsink_gzip *mysink = (bbsink_gzip *) sink;
228 z_stream *zs = &mysink->zstream;
229
230 /* There is no more data available. */
231 zs->next_in = (uint8 *) mysink->base.bbs_buffer;
232 zs->avail_in = 0;
233
234 while (1)
235 {
236 int res;
237
238 /* Write output data into unused portion of output buffer. */
239 Assert(mysink->bytes_written < mysink->base.bbs_next->bbs_buffer_length);
240 zs->next_out = (uint8 *)
241 mysink->base.bbs_next->bbs_buffer + mysink->bytes_written;
242 zs->avail_out =
243 mysink->base.bbs_next->bbs_buffer_length - mysink->bytes_written;
244
245 /*
246 * As bbsink_gzip_archive_contents, but pass Z_FINISH since there is
247 * no more input.
248 */
249 res = deflate(zs, Z_FINISH);
250 if (res == Z_STREAM_ERROR)
251 elog(ERROR, "could not compress data: %s", zs->msg);
252
253 /* Update our notion of how many bytes we've written. */
254 mysink->bytes_written =
255 mysink->base.bbs_next->bbs_buffer_length - zs->avail_out;
256
257 /*
258 * Apparently we had no data in the output buffer and deflate() was
259 * not able to add any. We must be done.
260 */
261 if (mysink->bytes_written == 0)
262 break;
263
264 /* Send whatever accumulated output bytes we have. */
265 bbsink_archive_contents(sink->bbs_next, mysink->bytes_written);
266 mysink->bytes_written = 0;
267 }
268
269 /* Must also pass on the information that this archive has ended. */
271}
272
273/*
274 * Manifest contents are not compressed, but we do need to copy them into
275 * the successor sink's buffer, because we have our own.
276 */
277static void
278bbsink_gzip_manifest_contents(bbsink *sink, size_t len)
279{
280 memcpy(sink->bbs_next->bbs_buffer, sink->bbs_buffer, len);
282}
283
284/*
285 * Wrapper function to adjust the signature of palloc to match what libz
286 * expects.
287 */
288static void *
289gzip_palloc(void *opaque, unsigned items, unsigned size)
290{
291 return palloc(items * size);
292}
293
294/*
295 * Wrapper function to adjust the signature of pfree to match what libz
296 * expects.
297 */
298static void
299gzip_pfree(void *opaque, void *address)
300{
301 pfree(address);
302}
303
304#endif
bbsink * bbsink_gzip_new(bbsink *next, pg_compress_specification *compress)
void bbsink_forward_begin_manifest(bbsink *sink)
void bbsink_forward_end_backup(bbsink *sink, XLogRecPtr endptr, TimeLineID endtli)
void bbsink_forward_cleanup(bbsink *sink)
void bbsink_forward_end_archive(bbsink *sink)
void bbsink_forward_end_manifest(bbsink *sink)
static void bbsink_begin_backup(bbsink *sink, bbsink_state *state, int buffer_length)
static void bbsink_begin_archive(bbsink *sink, const char *archive_name)
static void bbsink_archive_contents(bbsink *sink, size_t len)
static void bbsink_manifest_contents(bbsink *sink, size_t len)
static int32 next
Definition: blutils.c:221
uint8_t uint8
Definition: c.h:486
#define Assert(condition)
Definition: c.h:815
int errcode(int sqlerrcode)
Definition: elog.c:853
int errmsg(const char *fmt,...)
Definition: elog.c:1070
#define ERROR
Definition: elog.h:39
#define elog(elevel,...)
Definition: elog.h:225
#define ereport(elevel,...)
Definition: elog.h:149
if(TABLE==NULL||TABLE_index==NULL)
Definition: isn.c:76
void pfree(void *pointer)
Definition: mcxt.c:1521
void * palloc0(Size size)
Definition: mcxt.c:1347
void * palloc(Size size)
Definition: mcxt.c:1317
const void size_t len
while(p+4<=pend)
static int compresslevel
Definition: pg_receivewal.c:45
char * psprintf(const char *fmt,...)
Definition: psprintf.c:43
static pg_noinline void Size size
Definition: slab.c:607
void(* begin_backup)(bbsink *sink)
bbsink * bbs_next
bbsink_state * bbs_state
char * bbs_buffer
size_t bbs_buffer_length
static ItemArray items
Definition: test_tidstore.c:48