PostgreSQL Source Code  git master
basebackup_gzip.c
Go to the documentation of this file.
1 /*-------------------------------------------------------------------------
2  *
3  * basebackup_gzip.c
4  * Basebackup sink implementing gzip compression.
5  *
6  * Portions Copyright (c) 2010-2022, PostgreSQL Global Development Group
7  *
8  * IDENTIFICATION
9  * src/backend/replication/basebackup_gzip.c
10  *
11  *-------------------------------------------------------------------------
12  */
13 #include "postgres.h"
14 
15 #ifdef HAVE_LIBZ
16 #include <zlib.h>
17 #endif
18 
20 
21 #ifdef HAVE_LIBZ
22 typedef struct bbsink_gzip
23 {
24  /* Common information for all types of sink. */
25  bbsink base;
26 
27  /* Compression level. */
28  int compresslevel;
29 
30  /* Compressed data stream. */
31  z_stream zstream;
32 
33  /* Number of bytes staged in output buffer. */
34  size_t bytes_written;
35 } bbsink_gzip;
36 
37 static void bbsink_gzip_begin_backup(bbsink *sink);
38 static void bbsink_gzip_begin_archive(bbsink *sink, const char *archive_name);
39 static void bbsink_gzip_archive_contents(bbsink *sink, size_t len);
40 static void bbsink_gzip_manifest_contents(bbsink *sink, size_t len);
41 static void bbsink_gzip_end_archive(bbsink *sink);
42 static void *gzip_palloc(void *opaque, unsigned items, unsigned size);
43 static void gzip_pfree(void *opaque, void *address);
44 
45 static const bbsink_ops bbsink_gzip_ops = {
46  .begin_backup = bbsink_gzip_begin_backup,
47  .begin_archive = bbsink_gzip_begin_archive,
48  .archive_contents = bbsink_gzip_archive_contents,
49  .end_archive = bbsink_gzip_end_archive,
50  .begin_manifest = bbsink_forward_begin_manifest,
51  .manifest_contents = bbsink_gzip_manifest_contents,
52  .end_manifest = bbsink_forward_end_manifest,
53  .end_backup = bbsink_forward_end_backup,
54  .cleanup = bbsink_forward_cleanup
55 };
56 #endif
57 
58 /*
59  * Create a new basebackup sink that performs gzip compression.
60  */
61 bbsink *
63 {
64 #ifndef HAVE_LIBZ
65  ereport(ERROR,
66  (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
67  errmsg("gzip compression is not supported by this build")));
68  return NULL; /* keep compiler quiet */
69 #else
70  bbsink_gzip *sink;
71  int compresslevel;
72 
73  Assert(next != NULL);
74 
75  if ((compress->options & PG_COMPRESSION_OPTION_LEVEL) == 0)
77  else
78  {
79  compresslevel = compress->level;
80  Assert(compresslevel >= 1 && compresslevel <= 9);
81  }
82 
83  sink = palloc0(sizeof(bbsink_gzip));
84  *((const bbsink_ops **) &sink->base.bbs_ops) = &bbsink_gzip_ops;
85  sink->base.bbs_next = next;
86  sink->compresslevel = compresslevel;
87 
88  return &sink->base;
89 #endif
90 }
91 
92 #ifdef HAVE_LIBZ
93 
94 /*
95  * Begin backup.
96  */
97 static void
98 bbsink_gzip_begin_backup(bbsink *sink)
99 {
100  /*
101  * We need our own buffer, because we're going to pass different data to
102  * the next sink than what gets passed to us.
103  */
104  sink->bbs_buffer = palloc(sink->bbs_buffer_length);
105 
106  /*
107  * Since deflate() doesn't require the output buffer to be of any
108  * particular size, we can just make it the same size as the input buffer.
109  */
111  sink->bbs_buffer_length);
112 }
113 
114 /*
115  * Prepare to compress the next archive.
116  */
117 static void
118 bbsink_gzip_begin_archive(bbsink *sink, const char *archive_name)
119 {
120  bbsink_gzip *mysink = (bbsink_gzip *) sink;
121  char *gz_archive_name;
122  z_stream *zs = &mysink->zstream;
123 
124  /* Initialize compressor object. */
125  memset(zs, 0, sizeof(z_stream));
126  zs->zalloc = gzip_palloc;
127  zs->zfree = gzip_pfree;
128  zs->next_out = (uint8 *) sink->bbs_next->bbs_buffer;
129  zs->avail_out = sink->bbs_next->bbs_buffer_length;
130 
131  /*
132  * We need to use deflateInit2() rather than deflateInit() here so that we
133  * can request a gzip header rather than a zlib header. Otherwise, we want
134  * to supply the same values that would have been used by default if we
135  * had just called deflateInit().
136  *
137  * Per the documentation for deflateInit2, the third argument must be
138  * Z_DEFLATED; the fourth argument is the number of "window bits", by
139  * default 15, but adding 16 gets you a gzip header rather than a zlib
140  * header; the fifth argument controls memory usage, and 8 is the default;
141  * and likewise Z_DEFAULT_STRATEGY is the default for the sixth argument.
142  */
143  if (deflateInit2(zs, mysink->compresslevel, Z_DEFLATED, 15 + 16, 8,
144  Z_DEFAULT_STRATEGY) != Z_OK)
145  ereport(ERROR,
146  errcode(ERRCODE_INTERNAL_ERROR),
147  errmsg("could not initialize compression library"));
148 
149  /*
150  * Add ".gz" to the archive name. Note that the pg_basebackup -z produces
151  * archives named ".tar.gz" rather than ".tgz", so we match that here.
152  */
153  gz_archive_name = psprintf("%s.gz", archive_name);
154  Assert(sink->bbs_next != NULL);
155  bbsink_begin_archive(sink->bbs_next, gz_archive_name);
156  pfree(gz_archive_name);
157 }
158 
159 /*
160  * Compress the input data to the output buffer until we run out of input
161  * data. Each time the output buffer fills up, invoke the archive_contents()
162  * method for then next sink.
163  *
164  * Note that since we're compressing the input, it may very commonly happen
165  * that we consume all the input data without filling the output buffer. In
166  * that case, the compressed representation of the current input data won't
167  * actually be sent to the next bbsink until a later call to this function,
168  * or perhaps even not until bbsink_gzip_end_archive() is invoked.
169  */
170 static void
171 bbsink_gzip_archive_contents(bbsink *sink, size_t len)
172 {
173  bbsink_gzip *mysink = (bbsink_gzip *) sink;
174  z_stream *zs = &mysink->zstream;
175 
176  /* Compress data from input buffer. */
177  zs->next_in = (uint8 *) mysink->base.bbs_buffer;
178  zs->avail_in = len;
179 
180  while (zs->avail_in > 0)
181  {
182  int res;
183 
184  /* Write output data into unused portion of output buffer. */
185  Assert(mysink->bytes_written < mysink->base.bbs_next->bbs_buffer_length);
186  zs->next_out = (uint8 *)
187  mysink->base.bbs_next->bbs_buffer + mysink->bytes_written;
188  zs->avail_out =
189  mysink->base.bbs_next->bbs_buffer_length - mysink->bytes_written;
190 
191  /*
192  * Try to compress. Note that this will update zs->next_in and
193  * zs->avail_in according to how much input data was consumed, and
194  * zs->next_out and zs->avail_out according to how many output bytes
195  * were produced.
196  *
197  * According to the zlib documentation, Z_STREAM_ERROR should only
198  * occur if we've made a programming error, or if say there's been a
199  * memory clobber; we use elog() rather than Assert() here out of an
200  * abundance of caution.
201  */
202  res = deflate(zs, Z_NO_FLUSH);
203  if (res == Z_STREAM_ERROR)
204  elog(ERROR, "could not compress data: %s", zs->msg);
205 
206  /* Update our notion of how many bytes we've written. */
207  mysink->bytes_written =
208  mysink->base.bbs_next->bbs_buffer_length - zs->avail_out;
209 
210  /*
211  * If the output buffer is full, it's time for the next sink to
212  * process the contents.
213  */
214  if (mysink->bytes_written >= mysink->base.bbs_next->bbs_buffer_length)
215  {
216  bbsink_archive_contents(sink->bbs_next, mysink->bytes_written);
217  mysink->bytes_written = 0;
218  }
219  }
220 }
221 
222 /*
223  * There might be some data inside zlib's internal buffers; we need to get
224  * that flushed out and forwarded to the successor sink as archive content.
225  *
226  * Then we can end processing for this archive.
227  */
228 static void
229 bbsink_gzip_end_archive(bbsink *sink)
230 {
231  bbsink_gzip *mysink = (bbsink_gzip *) sink;
232  z_stream *zs = &mysink->zstream;
233 
234  /* There is no more data available. */
235  zs->next_in = (uint8 *) mysink->base.bbs_buffer;
236  zs->avail_in = 0;
237 
238  while (1)
239  {
240  int res;
241 
242  /* Write output data into unused portion of output buffer. */
243  Assert(mysink->bytes_written < mysink->base.bbs_next->bbs_buffer_length);
244  zs->next_out = (uint8 *)
245  mysink->base.bbs_next->bbs_buffer + mysink->bytes_written;
246  zs->avail_out =
247  mysink->base.bbs_next->bbs_buffer_length - mysink->bytes_written;
248 
249  /*
250  * As bbsink_gzip_archive_contents, but pass Z_FINISH since there is
251  * no more input.
252  */
253  res = deflate(zs, Z_FINISH);
254  if (res == Z_STREAM_ERROR)
255  elog(ERROR, "could not compress data: %s", zs->msg);
256 
257  /* Update our notion of how many bytes we've written. */
258  mysink->bytes_written =
259  mysink->base.bbs_next->bbs_buffer_length - zs->avail_out;
260 
261  /*
262  * Apparently we had no data in the output buffer and deflate() was
263  * not able to add any. We must be done.
264  */
265  if (mysink->bytes_written == 0)
266  break;
267 
268  /* Send whatever accumulated output bytes we have. */
269  bbsink_archive_contents(sink->bbs_next, mysink->bytes_written);
270  mysink->bytes_written = 0;
271  }
272 
273  /* Must also pass on the information that this archive has ended. */
275 }
276 
277 /*
278  * Manifest contents are not compressed, but we do need to copy them into
279  * the successor sink's buffer, because we have our own.
280  */
281 static void
282 bbsink_gzip_manifest_contents(bbsink *sink, size_t len)
283 {
284  memcpy(sink->bbs_next->bbs_buffer, sink->bbs_buffer, len);
286 }
287 
288 /*
289  * Wrapper function to adjust the signature of palloc to match what libz
290  * expects.
291  */
292 static void *
293 gzip_palloc(void *opaque, unsigned items, unsigned size)
294 {
295  return palloc(items * size);
296 }
297 
298 /*
299  * Wrapper function to adjust the signature of pfree to match what libz
300  * expects.
301  */
302 static void
303 gzip_pfree(void *opaque, void *address)
304 {
305  pfree(address);
306 }
307 
308 #endif
bbsink * bbsink_gzip_new(bbsink *next, pg_compress_specification *compress)
void bbsink_forward_begin_manifest(bbsink *sink)
void bbsink_forward_end_backup(bbsink *sink, XLogRecPtr endptr, TimeLineID endtli)
void bbsink_forward_cleanup(bbsink *sink)
void bbsink_forward_end_archive(bbsink *sink)
void bbsink_forward_end_manifest(bbsink *sink)
static void bbsink_begin_backup(bbsink *sink, bbsink_state *state, int buffer_length)
static void bbsink_begin_archive(bbsink *sink, const char *archive_name)
static void bbsink_archive_contents(bbsink *sink, size_t len)
static void bbsink_manifest_contents(bbsink *sink, size_t len)
static int32 next
Definition: blutils.c:219
unsigned char uint8
Definition: c.h:439
#define PG_COMPRESSION_OPTION_LEVEL
Definition: compression.h:25
int errcode(int sqlerrcode)
Definition: elog.c:693
int errmsg(const char *fmt,...)
Definition: elog.c:904
#define ERROR
Definition: elog.h:33
#define elog(elevel,...)
Definition: elog.h:218
#define ereport(elevel,...)
Definition: elog.h:143
Assert(fmt[strlen(fmt) - 1] !='\n')
void pfree(void *pointer)
Definition: mcxt.c:1175
void * palloc0(Size size)
Definition: mcxt.c:1099
void * palloc(Size size)
Definition: mcxt.c:1068
#define Z_DEFAULT_COMPRESSION
const void size_t len
while(p+4<=pend)
static int compresslevel
Definition: pg_receivewal.c:45
char * psprintf(const char *fmt,...)
Definition: psprintf.c:46
void(* begin_backup)(bbsink *sink)
bbsink * bbs_next
bbsink_state * bbs_state
char * bbs_buffer
size_t bbs_buffer_length