PostgreSQL Source Code  git master
basebackup_gzip.c
Go to the documentation of this file.
1 /*-------------------------------------------------------------------------
2  *
3  * basebackup_gzip.c
4  * Basebackup sink implementing gzip compression.
5  *
6  * Portions Copyright (c) 2010-2023, PostgreSQL Global Development Group
7  *
8  * IDENTIFICATION
9  * src/backend/backup/basebackup_gzip.c
10  *
11  *-------------------------------------------------------------------------
12  */
13 #include "postgres.h"
14 
15 #ifdef HAVE_LIBZ
16 #include <zlib.h>
17 #endif
18 
19 #include "backup/basebackup_sink.h"
20 
21 #ifdef HAVE_LIBZ
22 typedef struct bbsink_gzip
23 {
24  /* Common information for all types of sink. */
25  bbsink base;
26 
27  /* Compression level. */
28  int compresslevel;
29 
30  /* Compressed data stream. */
31  z_stream zstream;
32 
33  /* Number of bytes staged in output buffer. */
34  size_t bytes_written;
35 } bbsink_gzip;
36 
37 static void bbsink_gzip_begin_backup(bbsink *sink);
38 static void bbsink_gzip_begin_archive(bbsink *sink, const char *archive_name);
39 static void bbsink_gzip_archive_contents(bbsink *sink, size_t len);
40 static void bbsink_gzip_manifest_contents(bbsink *sink, size_t len);
41 static void bbsink_gzip_end_archive(bbsink *sink);
42 static void *gzip_palloc(void *opaque, unsigned items, unsigned size);
43 static void gzip_pfree(void *opaque, void *address);
44 
45 static const bbsink_ops bbsink_gzip_ops = {
46  .begin_backup = bbsink_gzip_begin_backup,
47  .begin_archive = bbsink_gzip_begin_archive,
48  .archive_contents = bbsink_gzip_archive_contents,
49  .end_archive = bbsink_gzip_end_archive,
50  .begin_manifest = bbsink_forward_begin_manifest,
51  .manifest_contents = bbsink_gzip_manifest_contents,
52  .end_manifest = bbsink_forward_end_manifest,
53  .end_backup = bbsink_forward_end_backup,
54  .cleanup = bbsink_forward_cleanup
55 };
56 #endif
57 
58 /*
59  * Create a new basebackup sink that performs gzip compression.
60  */
61 bbsink *
63 {
64 #ifndef HAVE_LIBZ
65  ereport(ERROR,
66  (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
67  errmsg("gzip compression is not supported by this build")));
68  return NULL; /* keep compiler quiet */
69 #else
70  bbsink_gzip *sink;
71  int compresslevel;
72 
73  Assert(next != NULL);
74 
75  compresslevel = compress->level;
76  Assert((compresslevel >= 1 && compresslevel <= 9) ||
78 
79  sink = palloc0(sizeof(bbsink_gzip));
80  *((const bbsink_ops **) &sink->base.bbs_ops) = &bbsink_gzip_ops;
81  sink->base.bbs_next = next;
82  sink->compresslevel = compresslevel;
83 
84  return &sink->base;
85 #endif
86 }
87 
88 #ifdef HAVE_LIBZ
89 
90 /*
91  * Begin backup.
92  */
93 static void
94 bbsink_gzip_begin_backup(bbsink *sink)
95 {
96  /*
97  * We need our own buffer, because we're going to pass different data to
98  * the next sink than what gets passed to us.
99  */
100  sink->bbs_buffer = palloc(sink->bbs_buffer_length);
101 
102  /*
103  * Since deflate() doesn't require the output buffer to be of any
104  * particular size, we can just make it the same size as the input buffer.
105  */
107  sink->bbs_buffer_length);
108 }
109 
110 /*
111  * Prepare to compress the next archive.
112  */
113 static void
114 bbsink_gzip_begin_archive(bbsink *sink, const char *archive_name)
115 {
116  bbsink_gzip *mysink = (bbsink_gzip *) sink;
117  char *gz_archive_name;
118  z_stream *zs = &mysink->zstream;
119 
120  /* Initialize compressor object. */
121  memset(zs, 0, sizeof(z_stream));
122  zs->zalloc = gzip_palloc;
123  zs->zfree = gzip_pfree;
124  zs->next_out = (uint8 *) sink->bbs_next->bbs_buffer;
125  zs->avail_out = sink->bbs_next->bbs_buffer_length;
126 
127  /*
128  * We need to use deflateInit2() rather than deflateInit() here so that we
129  * can request a gzip header rather than a zlib header. Otherwise, we want
130  * to supply the same values that would have been used by default if we
131  * had just called deflateInit().
132  *
133  * Per the documentation for deflateInit2, the third argument must be
134  * Z_DEFLATED; the fourth argument is the number of "window bits", by
135  * default 15, but adding 16 gets you a gzip header rather than a zlib
136  * header; the fifth argument controls memory usage, and 8 is the default;
137  * and likewise Z_DEFAULT_STRATEGY is the default for the sixth argument.
138  */
139  if (deflateInit2(zs, mysink->compresslevel, Z_DEFLATED, 15 + 16, 8,
140  Z_DEFAULT_STRATEGY) != Z_OK)
141  ereport(ERROR,
142  errcode(ERRCODE_INTERNAL_ERROR),
143  errmsg("could not initialize compression library"));
144 
145  /*
146  * Add ".gz" to the archive name. Note that the pg_basebackup -z produces
147  * archives named ".tar.gz" rather than ".tgz", so we match that here.
148  */
149  gz_archive_name = psprintf("%s.gz", archive_name);
150  Assert(sink->bbs_next != NULL);
151  bbsink_begin_archive(sink->bbs_next, gz_archive_name);
152  pfree(gz_archive_name);
153 }
154 
155 /*
156  * Compress the input data to the output buffer until we run out of input
157  * data. Each time the output buffer fills up, invoke the archive_contents()
158  * method for then next sink.
159  *
160  * Note that since we're compressing the input, it may very commonly happen
161  * that we consume all the input data without filling the output buffer. In
162  * that case, the compressed representation of the current input data won't
163  * actually be sent to the next bbsink until a later call to this function,
164  * or perhaps even not until bbsink_gzip_end_archive() is invoked.
165  */
166 static void
167 bbsink_gzip_archive_contents(bbsink *sink, size_t len)
168 {
169  bbsink_gzip *mysink = (bbsink_gzip *) sink;
170  z_stream *zs = &mysink->zstream;
171 
172  /* Compress data from input buffer. */
173  zs->next_in = (uint8 *) mysink->base.bbs_buffer;
174  zs->avail_in = len;
175 
176  while (zs->avail_in > 0)
177  {
178  int res;
179 
180  /* Write output data into unused portion of output buffer. */
181  Assert(mysink->bytes_written < mysink->base.bbs_next->bbs_buffer_length);
182  zs->next_out = (uint8 *)
183  mysink->base.bbs_next->bbs_buffer + mysink->bytes_written;
184  zs->avail_out =
185  mysink->base.bbs_next->bbs_buffer_length - mysink->bytes_written;
186 
187  /*
188  * Try to compress. Note that this will update zs->next_in and
189  * zs->avail_in according to how much input data was consumed, and
190  * zs->next_out and zs->avail_out according to how many output bytes
191  * were produced.
192  *
193  * According to the zlib documentation, Z_STREAM_ERROR should only
194  * occur if we've made a programming error, or if say there's been a
195  * memory clobber; we use elog() rather than Assert() here out of an
196  * abundance of caution.
197  */
198  res = deflate(zs, Z_NO_FLUSH);
199  if (res == Z_STREAM_ERROR)
200  elog(ERROR, "could not compress data: %s", zs->msg);
201 
202  /* Update our notion of how many bytes we've written. */
203  mysink->bytes_written =
204  mysink->base.bbs_next->bbs_buffer_length - zs->avail_out;
205 
206  /*
207  * If the output buffer is full, it's time for the next sink to
208  * process the contents.
209  */
210  if (mysink->bytes_written >= mysink->base.bbs_next->bbs_buffer_length)
211  {
212  bbsink_archive_contents(sink->bbs_next, mysink->bytes_written);
213  mysink->bytes_written = 0;
214  }
215  }
216 }
217 
218 /*
219  * There might be some data inside zlib's internal buffers; we need to get
220  * that flushed out and forwarded to the successor sink as archive content.
221  *
222  * Then we can end processing for this archive.
223  */
224 static void
225 bbsink_gzip_end_archive(bbsink *sink)
226 {
227  bbsink_gzip *mysink = (bbsink_gzip *) sink;
228  z_stream *zs = &mysink->zstream;
229 
230  /* There is no more data available. */
231  zs->next_in = (uint8 *) mysink->base.bbs_buffer;
232  zs->avail_in = 0;
233 
234  while (1)
235  {
236  int res;
237 
238  /* Write output data into unused portion of output buffer. */
239  Assert(mysink->bytes_written < mysink->base.bbs_next->bbs_buffer_length);
240  zs->next_out = (uint8 *)
241  mysink->base.bbs_next->bbs_buffer + mysink->bytes_written;
242  zs->avail_out =
243  mysink->base.bbs_next->bbs_buffer_length - mysink->bytes_written;
244 
245  /*
246  * As bbsink_gzip_archive_contents, but pass Z_FINISH since there is
247  * no more input.
248  */
249  res = deflate(zs, Z_FINISH);
250  if (res == Z_STREAM_ERROR)
251  elog(ERROR, "could not compress data: %s", zs->msg);
252 
253  /* Update our notion of how many bytes we've written. */
254  mysink->bytes_written =
255  mysink->base.bbs_next->bbs_buffer_length - zs->avail_out;
256 
257  /*
258  * Apparently we had no data in the output buffer and deflate() was
259  * not able to add any. We must be done.
260  */
261  if (mysink->bytes_written == 0)
262  break;
263 
264  /* Send whatever accumulated output bytes we have. */
265  bbsink_archive_contents(sink->bbs_next, mysink->bytes_written);
266  mysink->bytes_written = 0;
267  }
268 
269  /* Must also pass on the information that this archive has ended. */
271 }
272 
273 /*
274  * Manifest contents are not compressed, but we do need to copy them into
275  * the successor sink's buffer, because we have our own.
276  */
277 static void
278 bbsink_gzip_manifest_contents(bbsink *sink, size_t len)
279 {
280  memcpy(sink->bbs_next->bbs_buffer, sink->bbs_buffer, len);
282 }
283 
284 /*
285  * Wrapper function to adjust the signature of palloc to match what libz
286  * expects.
287  */
288 static void *
289 gzip_palloc(void *opaque, unsigned items, unsigned size)
290 {
291  return palloc(items * size);
292 }
293 
294 /*
295  * Wrapper function to adjust the signature of pfree to match what libz
296  * expects.
297  */
298 static void
299 gzip_pfree(void *opaque, void *address)
300 {
301  pfree(address);
302 }
303 
304 #endif
bbsink * bbsink_gzip_new(bbsink *next, pg_compress_specification *compress)
void bbsink_forward_begin_manifest(bbsink *sink)
void bbsink_forward_end_backup(bbsink *sink, XLogRecPtr endptr, TimeLineID endtli)
void bbsink_forward_cleanup(bbsink *sink)
void bbsink_forward_end_archive(bbsink *sink)
void bbsink_forward_end_manifest(bbsink *sink)
static void bbsink_begin_backup(bbsink *sink, bbsink_state *state, int buffer_length)
static void bbsink_begin_archive(bbsink *sink, const char *archive_name)
static void bbsink_archive_contents(bbsink *sink, size_t len)
static void bbsink_manifest_contents(bbsink *sink, size_t len)
static int32 next
Definition: blutils.c:219
unsigned char uint8
Definition: c.h:488
int errcode(int sqlerrcode)
Definition: elog.c:858
int errmsg(const char *fmt,...)
Definition: elog.c:1069
#define ERROR
Definition: elog.h:39
#define ereport(elevel,...)
Definition: elog.h:149
if(TABLE==NULL||TABLE_index==NULL)
Definition: isn.c:77
Assert(fmt[strlen(fmt) - 1] !='\n')
void pfree(void *pointer)
Definition: mcxt.c:1436
void * palloc0(Size size)
Definition: mcxt.c:1241
void * palloc(Size size)
Definition: mcxt.c:1210
#define Z_DEFAULT_COMPRESSION
const void size_t len
while(p+4<=pend)
static int compresslevel
Definition: pg_receivewal.c:45
char * psprintf(const char *fmt,...)
Definition: psprintf.c:46
void(* begin_backup)(bbsink *sink)
bbsink * bbs_next
bbsink_state * bbs_state
char * bbs_buffer
size_t bbs_buffer_length