PostgreSQL Source Code git master
Loading...
Searching...
No Matches
astreamer.h File Reference
#include "common/compression.h"
#include "lib/stringinfo.h"
#include "pqexpbuffer.h"
Include dependency graph for astreamer.h:
This graph shows which files directly or indirectly include this file:

Go to the source code of this file.

Data Structures

struct  astreamer_member
 
struct  astreamer
 
struct  astreamer_ops
 

Typedefs

typedef struct astreamer astreamer
 
typedef struct astreamer_ops astreamer_ops
 

Enumerations

enum  astreamer_archive_context {
  ASTREAMER_UNKNOWN , ASTREAMER_MEMBER_HEADER , ASTREAMER_MEMBER_CONTENTS , ASTREAMER_MEMBER_TRAILER ,
  ASTREAMER_ARCHIVE_TRAILER
}
 

Functions

static void astreamer_content (astreamer *streamer, astreamer_member *member, const char *data, int len, astreamer_archive_context context)
 
static void astreamer_finalize (astreamer *streamer)
 
static void astreamer_free (astreamer *streamer)
 
static void astreamer_buffer_bytes (astreamer *streamer, const char **data, int *len, int nbytes)
 
static bool astreamer_buffer_until (astreamer *streamer, const char **data, int *len, int target_bytes)
 
astreamerastreamer_plain_writer_new (char *pathname, FILE *file)
 
astreamerastreamer_gzip_writer_new (char *pathname, FILE *file, pg_compress_specification *compress)
 
astreamerastreamer_extractor_new (const char *basepath, const char *(*link_map)(const char *), void(*report_output_file)(const char *))
 
astreamerastreamer_gzip_decompressor_new (astreamer *next)
 
astreamerastreamer_lz4_compressor_new (astreamer *next, pg_compress_specification *compress)
 
astreamerastreamer_lz4_decompressor_new (astreamer *next)
 
astreamerastreamer_zstd_compressor_new (astreamer *next, pg_compress_specification *compress)
 
astreamerastreamer_zstd_decompressor_new (astreamer *next)
 
astreamerastreamer_tar_parser_new (astreamer *next)
 
astreamerastreamer_tar_terminator_new (astreamer *next)
 
astreamerastreamer_tar_archiver_new (astreamer *next)
 

Typedef Documentation

◆ astreamer

Definition at line 40 of file astreamer.h.

◆ astreamer_ops

Definition at line 41 of file astreamer.h.

Enumeration Type Documentation

◆ astreamer_archive_context

Enumerator
ASTREAMER_UNKNOWN 
ASTREAMER_MEMBER_HEADER 
ASTREAMER_MEMBER_CONTENTS 
ASTREAMER_MEMBER_TRAILER 
ASTREAMER_ARCHIVE_TRAILER 

Definition at line 62 of file astreamer.h.

63{
astreamer_archive_context
Definition astreamer.h:63
@ ASTREAMER_MEMBER_HEADER
Definition astreamer.h:65
@ ASTREAMER_MEMBER_CONTENTS
Definition astreamer.h:66
@ ASTREAMER_MEMBER_TRAILER
Definition astreamer.h:67
@ ASTREAMER_ARCHIVE_TRAILER
Definition astreamer.h:68
@ ASTREAMER_UNKNOWN
Definition astreamer.h:64

Function Documentation

◆ astreamer_buffer_bytes()

static void astreamer_buffer_bytes ( astreamer streamer,
const char **  data,
int len,
int  nbytes 
)
inlinestatic

Definition at line 168 of file astreamer.h.

170{
171 Assert(nbytes <= *len);
172
173 appendBinaryStringInfo(&streamer->bbs_buffer, *data, nbytes);
174 *len -= nbytes;
175 *data += nbytes;
176}
#define Assert(condition)
Definition c.h:943
const void size_t len
const void * data
void appendBinaryStringInfo(StringInfo str, const void *data, int datalen)
Definition stringinfo.c:281
StringInfoData bbs_buffer
Definition astreamer.h:113

References appendBinaryStringInfo(), Assert, astreamer::bbs_buffer, data, and len.

Referenced by astreamer_buffer_until(), and astreamer_tar_parser_content().

◆ astreamer_buffer_until()

static bool astreamer_buffer_until ( astreamer streamer,
const char **  data,
int len,
int  target_bytes 
)
inlinestatic

Definition at line 186 of file astreamer.h.

188{
189 int buflen = streamer->bbs_buffer.len;
190
191 if (buflen >= target_bytes)
192 {
193 /* Target length already reached; nothing to do. */
194 return true;
195 }
196
197 if (buflen + *len < target_bytes)
198 {
199 /* Not enough data to reach target length; buffer all of it. */
200 astreamer_buffer_bytes(streamer, data, len, *len);
201 return false;
202 }
203
204 /* Buffer just enough to reach the target length. */
205 astreamer_buffer_bytes(streamer, data, len, target_bytes - buflen);
206 return true;
207}
static void astreamer_buffer_bytes(astreamer *streamer, const char **data, int *len, int nbytes)
Definition astreamer.h:168
static int fb(int x)

References astreamer_buffer_bytes(), astreamer::bbs_buffer, data, fb(), StringInfoData::len, and len.

Referenced by astreamer_tar_parser_content().

◆ astreamer_content()

static void astreamer_content ( astreamer streamer,
astreamer_member member,
const char data,
int  len,
astreamer_archive_context  context 
)
inlinestatic

Definition at line 137 of file astreamer.h.

140{
141 Assert(streamer != NULL);
142 streamer->bbs_ops->content(streamer, member, data, len, context);
143}
void(* content)(astreamer *streamer, astreamer_member *member, const char *data, int len, astreamer_archive_context context)
Definition astreamer.h:128
const astreamer_ops * bbs_ops
Definition astreamer.h:111

References Assert, astreamer::bbs_ops, astreamer_ops::content, data, fb(), and len.

Referenced by astreamer_inject_file(), astreamer_recovery_injector_content(), astreamer_tar_archiver_content(), astreamer_tar_header(), astreamer_tar_parser_content(), astreamer_tar_parser_finalize(), astreamer_tar_terminator_content(), astreamer_tar_terminator_finalize(), read_archive_file(), ReceiveArchiveStreamChunk(), ReceiveTarCopyChunk(), and verify_tar_file().

◆ astreamer_extractor_new()

astreamer * astreamer_extractor_new ( const char basepath,
const char *(*)(const char *)  link_map,
void(*)(const char *)  report_output_file 
)
extern

Definition at line 186 of file astreamer_file.c.

189{
190 astreamer_extractor *streamer;
191
193 *((const astreamer_ops **) &streamer->base.bbs_ops) =
195 streamer->basepath = pstrdup(basepath);
196 streamer->link_map = link_map;
197 streamer->report_output_file = report_output_file;
198
199 return &streamer->base;
200}
static const astreamer_ops astreamer_extractor_ops
#define palloc0_object(type)
Definition fe_memutils.h:75
char * pstrdup(const char *in)
Definition mcxt.c:1781
const char *(* link_map)(const char *)
void(* report_output_file)(const char *)

References astreamer_extractor_ops, astreamer_extractor::base, astreamer_extractor::basepath, astreamer::bbs_ops, astreamer_extractor::link_map, palloc0_object, pstrdup(), and astreamer_extractor::report_output_file.

Referenced by CreateBackupStreamer().

◆ astreamer_finalize()

◆ astreamer_free()

◆ astreamer_gzip_decompressor_new()

astreamer * astreamer_gzip_decompressor_new ( astreamer next)
extern

Definition at line 236 of file astreamer_gzip.c.

237{
238#ifdef HAVE_LIBZ
240 z_stream *zs;
241
242 Assert(next != NULL);
243
245 *((const astreamer_ops **) &streamer->base.bbs_ops) =
247
248 streamer->base.bbs_next = next;
249 initStringInfo(&streamer->base.bbs_buffer);
250 /* Use a buffer size comparable to the other decompressors */
251 enlargeStringInfo(&streamer->base.bbs_buffer, 256 * 1024 - 1);
252
253 /* Initialize internal stream state for decompression */
254 zs = &streamer->zstream;
255 zs->zalloc = gzip_palloc;
256 zs->zfree = gzip_pfree;
257 zs->next_out = (uint8 *) streamer->base.bbs_buffer.data;
258 zs->avail_out = streamer->base.bbs_buffer.maxlen;
259
260 /*
261 * Data compression was initialized using deflateInit2 to request a gzip
262 * header. Similarly, we are using inflateInit2 to initialize data
263 * decompression.
264 *
265 * Per the documentation for inflateInit2, the second argument is
266 * "windowBits" and its value must be greater than or equal to the value
267 * provided while compressing the data, so we are using the maximum
268 * possible value for safety.
269 */
270 if (inflateInit2(zs, 15 + 16) != Z_OK)
271 pg_fatal("could not initialize compression library");
272
273 return &streamer->base;
274#else
275 pg_fatal("this build does not support compression with %s", "gzip");
276 return NULL; /* keep compiler quiet */
277#endif
278}
static int32 next
Definition blutils.c:225
uint8_t uint8
Definition c.h:622
#define pg_fatal(...)
void enlargeStringInfo(StringInfo str, int needed)
Definition stringinfo.c:337
void initStringInfo(StringInfo str)
Definition stringinfo.c:97

References Assert, enlargeStringInfo(), fb(), initStringInfo(), next, palloc0_object, and pg_fatal.

Referenced by create_archive_verifier(), CreateBackupStreamer(), and init_archive_reader().

◆ astreamer_gzip_writer_new()

astreamer * astreamer_gzip_writer_new ( char pathname,
FILE file,
pg_compress_specification compress 
)
extern

Definition at line 99 of file astreamer_gzip.c.

101{
102#ifdef HAVE_LIBZ
103 astreamer_gzip_writer *streamer;
104
106 *((const astreamer_ops **) &streamer->base.bbs_ops) =
108
109 streamer->pathname = pstrdup(pathname);
110
111 if (file == NULL)
112 {
113 streamer->gzfile = gzopen(pathname, "wb");
114 if (streamer->gzfile == NULL)
115 pg_fatal("could not create compressed file \"%s\": %m",
116 pathname);
117 }
118 else
119 {
120 /*
121 * We must dup the file handle so that gzclose doesn't break the
122 * caller's FILE. See comment for astreamer_gzip_writer_finalize.
123 */
124 int fd = dup(fileno(file));
125
126 if (fd < 0)
127 pg_fatal("could not duplicate stdout: %m");
128
129 streamer->gzfile = gzdopen(fd, "wb");
130 if (streamer->gzfile == NULL)
131 pg_fatal("could not open output file: %m");
132 }
133
134 if (gzsetparams(streamer->gzfile, compress->level, Z_DEFAULT_STRATEGY) != Z_OK)
135 pg_fatal("could not set compression level %d: %s",
136 compress->level, get_gz_error(streamer->gzfile));
137
138 return &streamer->base;
139#else
140 pg_fatal("this build does not support compression with %s", "gzip");
141 return NULL; /* keep compiler quiet */
142#endif
143}
static int fd(const char *x, int i)

References fb(), fd(), pg_compress_specification::level, palloc0_object, pg_fatal, and pstrdup().

Referenced by CreateBackupStreamer().

◆ astreamer_lz4_compressor_new()

astreamer * astreamer_lz4_compressor_new ( astreamer next,
pg_compress_specification compress 
)
extern

Definition at line 72 of file astreamer_lz4.c.

73{
74#ifdef USE_LZ4
75 astreamer_lz4_frame *streamer;
78
79 Assert(next != NULL);
80
82 *((const astreamer_ops **) &streamer->base.bbs_ops) =
84
85 streamer->base.bbs_next = next;
86 initStringInfo(&streamer->base.bbs_buffer);
87 streamer->header_written = false;
88
89 /* Initialize stream compression preferences */
90 prefs = &streamer->prefs;
91 memset(prefs, 0, sizeof(LZ4F_preferences_t));
92 prefs->frameInfo.blockSizeID = LZ4F_max256KB;
93 prefs->compressionLevel = compress->level;
94
97 pg_fatal("could not create lz4 compression context: %s",
99
100 return &streamer->base;
101#else
102 pg_fatal("this build does not support compression with %s", "LZ4");
103 return NULL; /* keep compiler quiet */
104#endif
105}

References Assert, fb(), initStringInfo(), pg_compress_specification::level, next, palloc0_object, and pg_fatal.

Referenced by CreateBackupStreamer().

◆ astreamer_lz4_decompressor_new()

astreamer * astreamer_lz4_decompressor_new ( astreamer next)
extern

Definition at line 277 of file astreamer_lz4.c.

278{
279#ifdef USE_LZ4
280 astreamer_lz4_frame *streamer;
282
283 Assert(next != NULL);
284
286 *((const astreamer_ops **) &streamer->base.bbs_ops) =
288
289 streamer->base.bbs_next = next;
290 initStringInfo(&streamer->base.bbs_buffer);
291 /* Use a buffer size comparable to the compressor's */
292 enlargeStringInfo(&streamer->base.bbs_buffer, 256 * 1024 - 1);
293
294 /* Initialize internal stream state for decompression */
297 pg_fatal("could not initialize compression library: %s",
299
300 return &streamer->base;
301#else
302 pg_fatal("this build does not support compression with %s", "LZ4");
303 return NULL; /* keep compiler quiet */
304#endif
305}

References Assert, enlargeStringInfo(), fb(), initStringInfo(), next, palloc0_object, and pg_fatal.

Referenced by create_archive_verifier(), CreateBackupStreamer(), and init_archive_reader().

◆ astreamer_plain_writer_new()

astreamer * astreamer_plain_writer_new ( char pathname,
FILE file 
)
extern

Definition at line 81 of file astreamer_file.c.

82{
83 astreamer_plain_writer *streamer;
84
86 *((const astreamer_ops **) &streamer->base.bbs_ops) =
88
89 streamer->pathname = pstrdup(pathname);
90 streamer->file = file;
91
92 if (file == NULL)
93 {
94 streamer->file = fopen(pathname, "wb");
95 if (streamer->file == NULL)
96 pg_fatal("could not create file \"%s\": %m", pathname);
97 streamer->should_close_file = true;
98 }
99
100 return &streamer->base;
101}
static const astreamer_ops astreamer_plain_writer_ops

References astreamer_plain_writer_ops, astreamer_plain_writer::base, astreamer::bbs_ops, fb(), astreamer_plain_writer::file, palloc0_object, astreamer_plain_writer::pathname, pg_fatal, pstrdup(), and astreamer_plain_writer::should_close_file.

Referenced by CreateBackupStreamer().

◆ astreamer_tar_archiver_new()

astreamer * astreamer_tar_archiver_new ( astreamer next)
extern

Definition at line 388 of file astreamer_tar.c.

389{
390 astreamer_tar_archiver *streamer;
391
393 *((const astreamer_ops **) &streamer->base.bbs_ops) =
395 streamer->base.bbs_next = next;
396
397 return &streamer->base;
398}
static const astreamer_ops astreamer_tar_archiver_ops
astreamer * bbs_next
Definition astreamer.h:112

References astreamer_tar_archiver_ops, astreamer_tar_archiver::base, astreamer::bbs_next, astreamer::bbs_ops, next, and palloc0_object.

Referenced by CreateBackupStreamer().

◆ astreamer_tar_parser_new()

astreamer * astreamer_tar_parser_new ( astreamer next)
extern

◆ astreamer_tar_terminator_new()

astreamer * astreamer_tar_terminator_new ( astreamer next)
extern

Definition at line 494 of file astreamer_tar.c.

495{
496 astreamer *streamer;
497
498 streamer = palloc0_object(astreamer);
499 *((const astreamer_ops **) &streamer->bbs_ops) =
501 streamer->bbs_next = next;
502
503 return streamer;
504}
static const astreamer_ops astreamer_tar_terminator_ops

References astreamer_tar_terminator_ops, astreamer::bbs_next, astreamer::bbs_ops, next, and palloc0_object.

Referenced by CreateBackupStreamer().

◆ astreamer_zstd_compressor_new()

astreamer * astreamer_zstd_compressor_new ( astreamer next,
pg_compress_specification compress 
)
extern

Definition at line 70 of file astreamer_zstd.c.

71{
72#ifdef USE_ZSTD
73 astreamer_zstd_frame *streamer;
74 size_t ret;
75
76 Assert(next != NULL);
77
79
80 *((const astreamer_ops **) &streamer->base.bbs_ops) =
82
83 streamer->base.bbs_next = next;
84 initStringInfo(&streamer->base.bbs_buffer);
85 enlargeStringInfo(&streamer->base.bbs_buffer, ZSTD_CStreamOutSize());
86
87 streamer->cctx = ZSTD_createCCtx();
88 if (!streamer->cctx)
89 pg_fatal("could not create zstd compression context");
90
91 /* Set compression level */
93 compress->level);
94 if (ZSTD_isError(ret))
95 pg_fatal("could not set zstd compression level to %d: %s",
96 compress->level, ZSTD_getErrorName(ret));
97
98 /* Set # of workers, if specified */
99 if ((compress->options & PG_COMPRESSION_OPTION_WORKERS) != 0)
100 {
101 /*
102 * On older versions of libzstd, this option does not exist, and
103 * trying to set it will fail. Similarly for newer versions if they
104 * are compiled without threading support.
105 */
106 ret = ZSTD_CCtx_setParameter(streamer->cctx, ZSTD_c_nbWorkers,
107 compress->workers);
108 if (ZSTD_isError(ret))
109 pg_fatal("could not set compression worker count to %d: %s",
110 compress->workers, ZSTD_getErrorName(ret));
111 }
112
113 if ((compress->options & PG_COMPRESSION_OPTION_LONG_DISTANCE) != 0)
114 {
115 ret = ZSTD_CCtx_setParameter(streamer->cctx,
117 compress->long_distance);
118 if (ZSTD_isError(ret))
119 pg_fatal("could not enable long-distance mode: %s",
120 ZSTD_getErrorName(ret));
121 }
122
123 /* Initialize the ZSTD output buffer. */
124 streamer->zstd_outBuf.dst = streamer->base.bbs_buffer.data;
125 streamer->zstd_outBuf.size = streamer->base.bbs_buffer.maxlen;
126 streamer->zstd_outBuf.pos = 0;
127
128 return &streamer->base;
129#else
130 pg_fatal("this build does not support compression with %s", "ZSTD");
131 return NULL; /* keep compiler quiet */
132#endif
133}
#define PG_COMPRESSION_OPTION_WORKERS
Definition compression.h:29
#define PG_COMPRESSION_OPTION_LONG_DISTANCE
Definition compression.h:30

References Assert, enlargeStringInfo(), fb(), initStringInfo(), pg_compress_specification::level, pg_compress_specification::long_distance, next, pg_compress_specification::options, palloc0_object, PG_COMPRESSION_OPTION_LONG_DISTANCE, PG_COMPRESSION_OPTION_WORKERS, pg_fatal, and pg_compress_specification::workers.

Referenced by CreateBackupStreamer().

◆ astreamer_zstd_decompressor_new()

astreamer * astreamer_zstd_decompressor_new ( astreamer next)
extern

Definition at line 259 of file astreamer_zstd.c.

260{
261#ifdef USE_ZSTD
262 astreamer_zstd_frame *streamer;
263
264 Assert(next != NULL);
265
267 *((const astreamer_ops **) &streamer->base.bbs_ops) =
269
270 streamer->base.bbs_next = next;
271 initStringInfo(&streamer->base.bbs_buffer);
272 enlargeStringInfo(&streamer->base.bbs_buffer, ZSTD_DStreamOutSize());
273
274 streamer->dctx = ZSTD_createDCtx();
275 if (!streamer->dctx)
276 pg_fatal("could not create zstd decompression context");
277
278 /* Initialize the ZSTD output buffer. */
279 streamer->zstd_outBuf.dst = streamer->base.bbs_buffer.data;
280 streamer->zstd_outBuf.size = streamer->base.bbs_buffer.maxlen;
281 streamer->zstd_outBuf.pos = 0;
282
283 return &streamer->base;
284#else
285 pg_fatal("this build does not support compression with %s", "ZSTD");
286 return NULL; /* keep compiler quiet */
287#endif
288}

References Assert, enlargeStringInfo(), fb(), initStringInfo(), next, palloc0_object, and pg_fatal.

Referenced by create_archive_verifier(), CreateBackupStreamer(), and init_archive_reader().