41 typedef struct astreamer_gzip_writer
46 } astreamer_gzip_writer;
48 typedef struct astreamer_gzip_decompressor
53 } astreamer_gzip_decompressor;
55 static void astreamer_gzip_writer_content(
astreamer *streamer,
59 static void astreamer_gzip_writer_finalize(
astreamer *streamer);
60 static void astreamer_gzip_writer_free(
astreamer *streamer);
61 static const char *get_gz_error(gzFile gzf);
64 .
content = astreamer_gzip_writer_content,
65 .finalize = astreamer_gzip_writer_finalize,
66 .free = astreamer_gzip_writer_free
69 static void astreamer_gzip_decompressor_content(
astreamer *streamer,
73 static void astreamer_gzip_decompressor_finalize(
astreamer *streamer);
74 static void astreamer_gzip_decompressor_free(
astreamer *streamer);
75 static void *gzip_palloc(
void *opaque,
unsigned items,
unsigned size);
76 static void gzip_pfree(
void *opaque,
void *address);
78 static const astreamer_ops astreamer_gzip_decompressor_ops = {
79 .
content = astreamer_gzip_decompressor_content,
80 .finalize = astreamer_gzip_decompressor_finalize,
81 .free = astreamer_gzip_decompressor_free
105 astreamer_gzip_writer *streamer;
107 streamer =
palloc0(
sizeof(astreamer_gzip_writer));
109 &astreamer_gzip_writer_ops;
111 streamer->pathname =
pstrdup(pathname);
115 streamer->gzfile = gzopen(pathname,
"wb");
116 if (streamer->gzfile == NULL)
117 pg_fatal(
"could not create compressed file \"%s\": %m",
126 int fd = dup(fileno(file));
129 pg_fatal(
"could not duplicate stdout: %m");
131 streamer->gzfile = gzdopen(
fd,
"wb");
132 if (streamer->gzfile == NULL)
133 pg_fatal(
"could not open output file: %m");
136 if (gzsetparams(streamer->gzfile, compress->
level, Z_DEFAULT_STRATEGY) != Z_OK)
137 pg_fatal(
"could not set compression level %d: %s",
138 compress->
level, get_gz_error(streamer->gzfile));
140 return &streamer->base;
142 pg_fatal(
"this build does not support compression with %s",
"gzip");
152 astreamer_gzip_writer_content(
astreamer *streamer,
156 astreamer_gzip_writer *mystreamer;
158 mystreamer = (astreamer_gzip_writer *) streamer;
164 if (gzwrite(mystreamer->gzfile,
data,
len) !=
len)
169 pg_fatal(
"could not write to compressed file \"%s\": %s",
170 mystreamer->pathname, get_gz_error(mystreamer->gzfile));
185 astreamer_gzip_writer_finalize(
astreamer *streamer)
187 astreamer_gzip_writer *mystreamer;
189 mystreamer = (astreamer_gzip_writer *) streamer;
192 if (gzclose(mystreamer->gzfile) != 0)
193 pg_fatal(
"could not close compressed file \"%s\": %m",
194 mystreamer->pathname);
196 mystreamer->gzfile = NULL;
203 astreamer_gzip_writer_free(
astreamer *streamer)
205 astreamer_gzip_writer *mystreamer;
207 mystreamer = (astreamer_gzip_writer *) streamer;
209 Assert(mystreamer->base.bbs_next == NULL);
210 Assert(mystreamer->gzfile == NULL);
212 pfree(mystreamer->pathname);
220 get_gz_error(gzFile gzf)
225 errmsg = gzerror(gzf, &errnum);
226 if (errnum == Z_ERRNO)
241 astreamer_gzip_decompressor *streamer;
246 streamer =
palloc0(
sizeof(astreamer_gzip_decompressor));
248 &astreamer_gzip_decompressor_ops;
250 streamer->base.bbs_next =
next;
254 zs = &streamer->zstream;
255 zs->zalloc = gzip_palloc;
256 zs->zfree = gzip_pfree;
257 zs->next_out = (
uint8 *) streamer->base.bbs_buffer.data;
258 zs->avail_out = streamer->base.bbs_buffer.maxlen;
270 if (inflateInit2(zs, 15 + 16) != Z_OK)
271 pg_fatal(
"could not initialize compression library");
273 return &streamer->base;
275 pg_fatal(
"this build does not support compression with %s",
"gzip");
287 astreamer_gzip_decompressor_content(
astreamer *streamer,
292 astreamer_gzip_decompressor *mystreamer;
295 mystreamer = (astreamer_gzip_decompressor *) streamer;
297 zs = &mystreamer->zstream;
302 while (zs->avail_in > 0)
306 Assert(mystreamer->bytes_written < mystreamer->base.bbs_buffer.maxlen);
308 zs->next_out = (
uint8 *)
309 mystreamer->base.bbs_buffer.data + mystreamer->bytes_written;
311 mystreamer->base.bbs_buffer.maxlen - mystreamer->bytes_written;
319 res = inflate(zs, Z_NO_FLUSH);
321 if (
res == Z_STREAM_ERROR)
324 mystreamer->bytes_written =
325 mystreamer->base.bbs_buffer.maxlen - zs->avail_out;
328 if (mystreamer->bytes_written >= mystreamer->base.bbs_buffer.maxlen)
331 mystreamer->base.bbs_buffer.data,
332 mystreamer->base.bbs_buffer.maxlen,
context);
333 mystreamer->bytes_written = 0;
342 astreamer_gzip_decompressor_finalize(
astreamer *streamer)
344 astreamer_gzip_decompressor *mystreamer;
346 mystreamer = (astreamer_gzip_decompressor *) streamer;
353 mystreamer->base.bbs_buffer.data,
354 mystreamer->base.bbs_buffer.maxlen,
364 astreamer_gzip_decompressor_free(
astreamer *streamer)
376 gzip_palloc(
void *opaque,
unsigned items,
unsigned size)
386 gzip_pfree(
void *opaque,
void *address)
static void astreamer_free(astreamer *streamer)
static void astreamer_content(astreamer *streamer, astreamer_member *member, const char *data, int len, astreamer_archive_context context)
static void astreamer_finalize(astreamer *streamer)
astreamer_archive_context
astreamer * astreamer_gzip_decompressor_new(astreamer *next)
astreamer * astreamer_gzip_writer_new(char *pathname, FILE *file, pg_compress_specification *compress)
#define Assert(condition)
static void PGresult * res
int errmsg(const char *fmt,...)
if(TABLE==NULL||TABLE_index==NULL)
#define pg_log_error(...)
char * pstrdup(const char *in)
void pfree(void *pointer)
void * palloc0(Size size)
static int fd(const char *x, int i)
static pg_noinline void Size size
void initStringInfo(StringInfo str)
void(* content)(astreamer *streamer, astreamer_member *member, const char *data, int len, astreamer_archive_context context)
StringInfoData bbs_buffer