39typedef struct astreamer_gzip_writer
44} astreamer_gzip_writer;
46typedef struct astreamer_gzip_decompressor
51} astreamer_gzip_decompressor;
53static void astreamer_gzip_writer_content(
astreamer *streamer,
57static void astreamer_gzip_writer_finalize(
astreamer *streamer);
58static void astreamer_gzip_writer_free(
astreamer *streamer);
59static const char *get_gz_error(gzFile gzf);
62 .
content = astreamer_gzip_writer_content,
63 .finalize = astreamer_gzip_writer_finalize,
64 .free = astreamer_gzip_writer_free
67static void astreamer_gzip_decompressor_content(
astreamer *streamer,
71static void astreamer_gzip_decompressor_finalize(
astreamer *streamer);
72static void astreamer_gzip_decompressor_free(
astreamer *streamer);
73static void *gzip_palloc(
void *opaque,
unsigned items,
unsigned size);
74static void gzip_pfree(
void *opaque,
void *address);
77 .
content = astreamer_gzip_decompressor_content,
78 .finalize = astreamer_gzip_decompressor_finalize,
79 .free = astreamer_gzip_decompressor_free
103 astreamer_gzip_writer *streamer;
105 streamer =
palloc0(
sizeof(astreamer_gzip_writer));
107 &astreamer_gzip_writer_ops;
109 streamer->pathname =
pstrdup(pathname);
113 streamer->gzfile = gzopen(pathname,
"wb");
114 if (streamer->gzfile == NULL)
115 pg_fatal(
"could not create compressed file \"%s\": %m",
124 int fd = dup(fileno(file));
127 pg_fatal(
"could not duplicate stdout: %m");
129 streamer->gzfile = gzdopen(
fd,
"wb");
130 if (streamer->gzfile == NULL)
131 pg_fatal(
"could not open output file: %m");
134 if (gzsetparams(streamer->gzfile, compress->
level, Z_DEFAULT_STRATEGY) != Z_OK)
135 pg_fatal(
"could not set compression level %d: %s",
136 compress->
level, get_gz_error(streamer->gzfile));
138 return &streamer->base;
140 pg_fatal(
"this build does not support compression with %s",
"gzip");
150astreamer_gzip_writer_content(
astreamer *streamer,
154 astreamer_gzip_writer *mystreamer;
156 mystreamer = (astreamer_gzip_writer *) streamer;
162 if (gzwrite(mystreamer->gzfile,
data,
len) !=
len)
167 pg_fatal(
"could not write to compressed file \"%s\": %s",
168 mystreamer->pathname, get_gz_error(mystreamer->gzfile));
183astreamer_gzip_writer_finalize(
astreamer *streamer)
185 astreamer_gzip_writer *mystreamer;
187 mystreamer = (astreamer_gzip_writer *) streamer;
190 if (gzclose(mystreamer->gzfile) != 0)
191 pg_fatal(
"could not close compressed file \"%s\": %m",
192 mystreamer->pathname);
194 mystreamer->gzfile = NULL;
201astreamer_gzip_writer_free(
astreamer *streamer)
203 astreamer_gzip_writer *mystreamer;
205 mystreamer = (astreamer_gzip_writer *) streamer;
207 Assert(mystreamer->base.bbs_next == NULL);
208 Assert(mystreamer->gzfile == NULL);
210 pfree(mystreamer->pathname);
218get_gz_error(gzFile gzf)
223 errmsg = gzerror(gzf, &errnum);
224 if (errnum == Z_ERRNO)
239 astreamer_gzip_decompressor *streamer;
244 streamer =
palloc0(
sizeof(astreamer_gzip_decompressor));
246 &astreamer_gzip_decompressor_ops;
248 streamer->base.bbs_next =
next;
252 zs = &streamer->zstream;
253 zs->zalloc = gzip_palloc;
254 zs->zfree = gzip_pfree;
255 zs->next_out = (
uint8 *) streamer->base.bbs_buffer.data;
256 zs->avail_out = streamer->base.bbs_buffer.maxlen;
268 if (inflateInit2(zs, 15 + 16) != Z_OK)
269 pg_fatal(
"could not initialize compression library");
271 return &streamer->base;
273 pg_fatal(
"this build does not support compression with %s",
"gzip");
285astreamer_gzip_decompressor_content(
astreamer *streamer,
290 astreamer_gzip_decompressor *mystreamer;
293 mystreamer = (astreamer_gzip_decompressor *) streamer;
295 zs = &mystreamer->zstream;
300 while (zs->avail_in > 0)
304 Assert(mystreamer->bytes_written < mystreamer->base.bbs_buffer.maxlen);
306 zs->next_out = (
uint8 *)
307 mystreamer->base.bbs_buffer.data + mystreamer->bytes_written;
309 mystreamer->base.bbs_buffer.maxlen - mystreamer->bytes_written;
317 res = inflate(zs, Z_NO_FLUSH);
319 if (res == Z_STREAM_ERROR)
322 mystreamer->bytes_written =
323 mystreamer->base.bbs_buffer.maxlen - zs->avail_out;
326 if (mystreamer->bytes_written >= mystreamer->base.bbs_buffer.maxlen)
329 mystreamer->base.bbs_buffer.data,
330 mystreamer->base.bbs_buffer.maxlen, context);
331 mystreamer->bytes_written = 0;
340astreamer_gzip_decompressor_finalize(
astreamer *streamer)
342 astreamer_gzip_decompressor *mystreamer;
344 mystreamer = (astreamer_gzip_decompressor *) streamer;
351 mystreamer->base.bbs_buffer.data,
352 mystreamer->base.bbs_buffer.maxlen,
362astreamer_gzip_decompressor_free(
astreamer *streamer)
374gzip_palloc(
void *opaque,
unsigned items,
unsigned size)
384gzip_pfree(
void *opaque,
void *address)
static void astreamer_free(astreamer *streamer)
static void astreamer_content(astreamer *streamer, astreamer_member *member, const char *data, int len, astreamer_archive_context context)
static void astreamer_finalize(astreamer *streamer)
astreamer_archive_context
astreamer * astreamer_gzip_decompressor_new(astreamer *next)
astreamer * astreamer_gzip_writer_new(char *pathname, FILE *file, pg_compress_specification *compress)
int errmsg(const char *fmt,...)
Assert(PointerIsAligned(start, uint64))
if(TABLE==NULL||TABLE_index==NULL)
#define pg_log_error(...)
char * pstrdup(const char *in)
void pfree(void *pointer)
void * palloc0(Size size)
static int fd(const char *x, int i)
void initStringInfo(StringInfo str)
void(* content)(astreamer *streamer, astreamer_member *member, const char *data, int len, astreamer_archive_context context)
StringInfoData bbs_buffer