PostgreSQL Source Code  git master
All Data Structures Namespaces Files Functions Variables Typedefs Enumerations Enumerator Macros Pages
astreamer.h File Reference
#include "common/compression.h"
#include "lib/stringinfo.h"
#include "pqexpbuffer.h"
Include dependency graph for astreamer.h:
This graph shows which files directly or indirectly include this file:

Go to the source code of this file.

Data Structures

struct  astreamer_member
 
struct  astreamer
 
struct  astreamer_ops
 

Typedefs

typedef struct astreamer astreamer
 
typedef struct astreamer_ops astreamer_ops
 

Enumerations

enum  astreamer_archive_context {
  ASTREAMER_UNKNOWN , ASTREAMER_MEMBER_HEADER , ASTREAMER_MEMBER_CONTENTS , ASTREAMER_MEMBER_TRAILER ,
  ASTREAMER_ARCHIVE_TRAILER
}
 

Functions

static void astreamer_content (astreamer *streamer, astreamer_member *member, const char *data, int len, astreamer_archive_context context)
 
static void astreamer_finalize (astreamer *streamer)
 
static void astreamer_free (astreamer *streamer)
 
static void astreamer_buffer_bytes (astreamer *streamer, const char **data, int *len, int nbytes)
 
static bool astreamer_buffer_until (astreamer *streamer, const char **data, int *len, int target_bytes)
 
astreamerastreamer_plain_writer_new (char *pathname, FILE *file)
 
astreamerastreamer_gzip_writer_new (char *pathname, FILE *file, pg_compress_specification *compress)
 
astreamerastreamer_extractor_new (const char *basepath, const char *(*link_map)(const char *), void(*report_output_file)(const char *))
 
astreamerastreamer_gzip_decompressor_new (astreamer *next)
 
astreamerastreamer_lz4_compressor_new (astreamer *next, pg_compress_specification *compress)
 
astreamerastreamer_lz4_decompressor_new (astreamer *next)
 
astreamerastreamer_zstd_compressor_new (astreamer *next, pg_compress_specification *compress)
 
astreamerastreamer_zstd_decompressor_new (astreamer *next)
 
astreamerastreamer_tar_parser_new (astreamer *next)
 
astreamerastreamer_tar_terminator_new (astreamer *next)
 
astreamerastreamer_tar_archiver_new (astreamer *next)
 

Typedef Documentation

◆ astreamer

typedef struct astreamer astreamer

Definition at line 1 of file astreamer.h.

◆ astreamer_ops

typedef struct astreamer_ops astreamer_ops

Definition at line 1 of file astreamer.h.

Enumeration Type Documentation

◆ astreamer_archive_context

Enumerator
ASTREAMER_UNKNOWN 
ASTREAMER_MEMBER_HEADER 
ASTREAMER_MEMBER_CONTENTS 
ASTREAMER_MEMBER_TRAILER 
ASTREAMER_ARCHIVE_TRAILER 

Definition at line 62 of file astreamer.h.

63 {
astreamer_archive_context
Definition: astreamer.h:63
@ ASTREAMER_MEMBER_HEADER
Definition: astreamer.h:65
@ ASTREAMER_MEMBER_CONTENTS
Definition: astreamer.h:66
@ ASTREAMER_MEMBER_TRAILER
Definition: astreamer.h:67
@ ASTREAMER_ARCHIVE_TRAILER
Definition: astreamer.h:68
@ ASTREAMER_UNKNOWN
Definition: astreamer.h:64

Function Documentation

◆ astreamer_buffer_bytes()

static void astreamer_buffer_bytes ( astreamer streamer,
const char **  data,
int *  len,
int  nbytes 
)
inlinestatic

Definition at line 166 of file astreamer.h.

168 {
169  Assert(nbytes <= *len);
170 
171  appendBinaryStringInfo(&streamer->bbs_buffer, *data, nbytes);
172  *len -= nbytes;
173  *data += nbytes;
174 }
#define Assert(condition)
Definition: c.h:863
const void size_t len
const void * data
void appendBinaryStringInfo(StringInfo str, const void *data, int datalen)
Definition: stringinfo.c:230
StringInfoData bbs_buffer
Definition: astreamer.h:111

References appendBinaryStringInfo(), Assert, astreamer::bbs_buffer, data, and len.

Referenced by astreamer_buffer_until(), and astreamer_tar_parser_content().

◆ astreamer_buffer_until()

static bool astreamer_buffer_until ( astreamer streamer,
const char **  data,
int *  len,
int  target_bytes 
)
inlinestatic

Definition at line 184 of file astreamer.h.

186 {
187  int buflen = streamer->bbs_buffer.len;
188 
189  if (buflen >= target_bytes)
190  {
191  /* Target length already reached; nothing to do. */
192  return true;
193  }
194 
195  if (buflen + *len < target_bytes)
196  {
197  /* Not enough data to reach target length; buffer all of it. */
198  astreamer_buffer_bytes(streamer, data, len, *len);
199  return false;
200  }
201 
202  /* Buffer just enough to reach the target length. */
203  astreamer_buffer_bytes(streamer, data, len, target_bytes - buflen);
204  return true;
205 }
static void astreamer_buffer_bytes(astreamer *streamer, const char **data, int *len, int nbytes)
Definition: astreamer.h:166

References astreamer_buffer_bytes(), astreamer::bbs_buffer, data, StringInfoData::len, and len.

Referenced by astreamer_tar_parser_content().

◆ astreamer_content()

static void astreamer_content ( astreamer streamer,
astreamer_member member,
const char *  data,
int  len,
astreamer_archive_context  context 
)
inlinestatic

Definition at line 135 of file astreamer.h.

138 {
139  Assert(streamer != NULL);
140  streamer->bbs_ops->content(streamer, member, data, len, context);
141 }
tree context
Definition: radixtree.h:1835
void(* content)(astreamer *streamer, astreamer_member *member, const char *data, int len, astreamer_archive_context context)
Definition: astreamer.h:126
const astreamer_ops * bbs_ops
Definition: astreamer.h:109

References Assert, astreamer::bbs_ops, astreamer_ops::content, context, data, and len.

Referenced by astreamer_inject_file(), astreamer_recovery_injector_content(), astreamer_tar_archiver_content(), astreamer_tar_header(), astreamer_tar_parser_content(), astreamer_tar_parser_finalize(), astreamer_tar_terminator_content(), astreamer_tar_terminator_finalize(), ReceiveArchiveStreamChunk(), ReceiveTarCopyChunk(), and verify_tar_file().

◆ astreamer_extractor_new()

astreamer* astreamer_extractor_new ( const char *  basepath,
const char *(*)(const char *)  link_map,
void(*)(const char *)  report_output_file 
)

Definition at line 186 of file astreamer_file.c.

189 {
190  astreamer_extractor *streamer;
191 
192  streamer = palloc0(sizeof(astreamer_extractor));
193  *((const astreamer_ops **) &streamer->base.bbs_ops) =
195  streamer->basepath = pstrdup(basepath);
196  streamer->link_map = link_map;
197  streamer->report_output_file = report_output_file;
198 
199  return &streamer->base;
200 }
static const astreamer_ops astreamer_extractor_ops
char * pstrdup(const char *in)
Definition: mcxt.c:1696
void * palloc0(Size size)
Definition: mcxt.c:1347
void(* report_output_file)(const char *)
const char *(* link_map)(const char *)

References astreamer_extractor_ops, astreamer_extractor::base, astreamer_extractor::basepath, astreamer::bbs_ops, astreamer_extractor::link_map, palloc0(), pstrdup(), and astreamer_extractor::report_output_file.

Referenced by CreateBackupStreamer().

◆ astreamer_finalize()

static void astreamer_finalize ( astreamer streamer)
inlinestatic

◆ astreamer_free()

static void astreamer_free ( astreamer streamer)
inlinestatic

◆ astreamer_gzip_decompressor_new()

astreamer* astreamer_gzip_decompressor_new ( astreamer next)

Definition at line 236 of file astreamer_gzip.c.

237 {
238 #ifdef HAVE_LIBZ
239  astreamer_gzip_decompressor *streamer;
240  z_stream *zs;
241 
242  Assert(next != NULL);
243 
244  streamer = palloc0(sizeof(astreamer_gzip_decompressor));
245  *((const astreamer_ops **) &streamer->base.bbs_ops) =
246  &astreamer_gzip_decompressor_ops;
247 
248  streamer->base.bbs_next = next;
249  initStringInfo(&streamer->base.bbs_buffer);
250 
251  /* Initialize internal stream state for decompression */
252  zs = &streamer->zstream;
253  zs->zalloc = gzip_palloc;
254  zs->zfree = gzip_pfree;
255  zs->next_out = (uint8 *) streamer->base.bbs_buffer.data;
256  zs->avail_out = streamer->base.bbs_buffer.maxlen;
257 
258  /*
259  * Data compression was initialized using deflateInit2 to request a gzip
260  * header. Similarly, we are using inflateInit2 to initialize data
261  * decompression.
262  *
263  * Per the documentation for inflateInit2, the second argument is
264  * "windowBits" and its value must be greater than or equal to the value
265  * provided while compressing the data, so we are using the maximum
266  * possible value for safety.
267  */
268  if (inflateInit2(zs, 15 + 16) != Z_OK)
269  pg_fatal("could not initialize compression library");
270 
271  return &streamer->base;
272 #else
273  pg_fatal("this build does not support compression with %s", "gzip");
274  return NULL; /* keep compiler quiet */
275 #endif
276 }
static int32 next
Definition: blutils.c:219
unsigned char uint8
Definition: c.h:516
if(TABLE==NULL||TABLE_index==NULL)
Definition: isn.c:76
#define pg_fatal(...)
void initStringInfo(StringInfo str)
Definition: stringinfo.c:56

References Assert, if(), initStringInfo(), next, palloc0(), and pg_fatal.

Referenced by create_archive_verifier(), and CreateBackupStreamer().

◆ astreamer_gzip_writer_new()

astreamer* astreamer_gzip_writer_new ( char *  pathname,
FILE *  file,
pg_compress_specification compress 
)

Definition at line 99 of file astreamer_gzip.c.

101 {
102 #ifdef HAVE_LIBZ
103  astreamer_gzip_writer *streamer;
104 
105  streamer = palloc0(sizeof(astreamer_gzip_writer));
106  *((const astreamer_ops **) &streamer->base.bbs_ops) =
107  &astreamer_gzip_writer_ops;
108 
109  streamer->pathname = pstrdup(pathname);
110 
111  if (file == NULL)
112  {
113  streamer->gzfile = gzopen(pathname, "wb");
114  if (streamer->gzfile == NULL)
115  pg_fatal("could not create compressed file \"%s\": %m",
116  pathname);
117  }
118  else
119  {
120  /*
121  * We must dup the file handle so that gzclose doesn't break the
122  * caller's FILE. See comment for astreamer_gzip_writer_finalize.
123  */
124  int fd = dup(fileno(file));
125 
126  if (fd < 0)
127  pg_fatal("could not duplicate stdout: %m");
128 
129  streamer->gzfile = gzdopen(fd, "wb");
130  if (streamer->gzfile == NULL)
131  pg_fatal("could not open output file: %m");
132  }
133 
134  if (gzsetparams(streamer->gzfile, compress->level, Z_DEFAULT_STRATEGY) != Z_OK)
135  pg_fatal("could not set compression level %d: %s",
136  compress->level, get_gz_error(streamer->gzfile));
137 
138  return &streamer->base;
139 #else
140  pg_fatal("this build does not support compression with %s", "gzip");
141  return NULL; /* keep compiler quiet */
142 #endif
143 }
static int fd(const char *x, int i)
Definition: preproc-init.c:105

References fd(), pg_compress_specification::level, palloc0(), pg_fatal, and pstrdup().

Referenced by CreateBackupStreamer().

◆ astreamer_lz4_compressor_new()

astreamer* astreamer_lz4_compressor_new ( astreamer next,
pg_compress_specification compress 
)

Definition at line 72 of file astreamer_lz4.c.

73 {
74 #ifdef USE_LZ4
75  astreamer_lz4_frame *streamer;
76  LZ4F_errorCode_t ctxError;
77  LZ4F_preferences_t *prefs;
78 
79  Assert(next != NULL);
80 
81  streamer = palloc0(sizeof(astreamer_lz4_frame));
82  *((const astreamer_ops **) &streamer->base.bbs_ops) =
83  &astreamer_lz4_compressor_ops;
84 
85  streamer->base.bbs_next = next;
86  initStringInfo(&streamer->base.bbs_buffer);
87  streamer->header_written = false;
88 
89  /* Initialize stream compression preferences */
90  prefs = &streamer->prefs;
91  memset(prefs, 0, sizeof(LZ4F_preferences_t));
92  prefs->frameInfo.blockSizeID = LZ4F_max256KB;
93  prefs->compressionLevel = compress->level;
94 
95  ctxError = LZ4F_createCompressionContext(&streamer->cctx, LZ4F_VERSION);
96  if (LZ4F_isError(ctxError))
97  pg_log_error("could not create lz4 compression context: %s",
98  LZ4F_getErrorName(ctxError));
99 
100  return &streamer->base;
101 #else
102  pg_fatal("this build does not support compression with %s", "LZ4");
103  return NULL; /* keep compiler quiet */
104 #endif
105 }
#define pg_log_error(...)
Definition: logging.h:106

References Assert, initStringInfo(), pg_compress_specification::level, next, palloc0(), pg_fatal, and pg_log_error.

Referenced by CreateBackupStreamer().

◆ astreamer_lz4_decompressor_new()

astreamer* astreamer_lz4_decompressor_new ( astreamer next)

Definition at line 277 of file astreamer_lz4.c.

278 {
279 #ifdef USE_LZ4
280  astreamer_lz4_frame *streamer;
281  LZ4F_errorCode_t ctxError;
282 
283  Assert(next != NULL);
284 
285  streamer = palloc0(sizeof(astreamer_lz4_frame));
286  *((const astreamer_ops **) &streamer->base.bbs_ops) =
287  &astreamer_lz4_decompressor_ops;
288 
289  streamer->base.bbs_next = next;
290  initStringInfo(&streamer->base.bbs_buffer);
291 
292  /* Initialize internal stream state for decompression */
293  ctxError = LZ4F_createDecompressionContext(&streamer->dctx, LZ4F_VERSION);
294  if (LZ4F_isError(ctxError))
295  pg_fatal("could not initialize compression library: %s",
296  LZ4F_getErrorName(ctxError));
297 
298  return &streamer->base;
299 #else
300  pg_fatal("this build does not support compression with %s", "LZ4");
301  return NULL; /* keep compiler quiet */
302 #endif
303 }

References Assert, initStringInfo(), next, palloc0(), and pg_fatal.

Referenced by create_archive_verifier(), and CreateBackupStreamer().

◆ astreamer_plain_writer_new()

astreamer* astreamer_plain_writer_new ( char *  pathname,
FILE *  file 
)

Definition at line 81 of file astreamer_file.c.

82 {
83  astreamer_plain_writer *streamer;
84 
85  streamer = palloc0(sizeof(astreamer_plain_writer));
86  *((const astreamer_ops **) &streamer->base.bbs_ops) =
88 
89  streamer->pathname = pstrdup(pathname);
90  streamer->file = file;
91 
92  if (file == NULL)
93  {
94  streamer->file = fopen(pathname, "wb");
95  if (streamer->file == NULL)
96  pg_fatal("could not create file \"%s\": %m", pathname);
97  streamer->should_close_file = true;
98  }
99 
100  return &streamer->base;
101 }
static const astreamer_ops astreamer_plain_writer_ops

References astreamer_plain_writer_ops, astreamer_plain_writer::base, astreamer::bbs_ops, astreamer_plain_writer::file, palloc0(), astreamer_plain_writer::pathname, pg_fatal, pstrdup(), and astreamer_plain_writer::should_close_file.

Referenced by CreateBackupStreamer().

◆ astreamer_tar_archiver_new()

astreamer* astreamer_tar_archiver_new ( astreamer next)

Definition at line 356 of file astreamer_tar.c.

357 {
358  astreamer_tar_archiver *streamer;
359 
360  streamer = palloc0(sizeof(astreamer_tar_archiver));
361  *((const astreamer_ops **) &streamer->base.bbs_ops) =
363  streamer->base.bbs_next = next;
364 
365  return &streamer->base;
366 }
static const astreamer_ops astreamer_tar_archiver_ops
Definition: astreamer_tar.c:66
astreamer * bbs_next
Definition: astreamer.h:110

References astreamer_tar_archiver_ops, astreamer_tar_archiver::base, astreamer::bbs_next, astreamer::bbs_ops, next, and palloc0().

Referenced by CreateBackupStreamer().

◆ astreamer_tar_parser_new()

astreamer* astreamer_tar_parser_new ( astreamer next)

Definition at line 93 of file astreamer_tar.c.

94 {
95  astreamer_tar_parser *streamer;
96 
97  streamer = palloc0(sizeof(astreamer_tar_parser));
98  *((const astreamer_ops **) &streamer->base.bbs_ops) =
100  streamer->base.bbs_next = next;
101  initStringInfo(&streamer->base.bbs_buffer);
103 
104  return &streamer->base;
105 }
static const astreamer_ops astreamer_tar_parser_ops
Definition: astreamer_tar.c:53
astreamer_archive_context next_context
Definition: astreamer_tar.c:33

References ASTREAMER_MEMBER_HEADER, astreamer_tar_parser_ops, astreamer_tar_parser::base, astreamer::bbs_buffer, astreamer::bbs_next, astreamer::bbs_ops, initStringInfo(), next, astreamer_tar_parser::next_context, and palloc0().

Referenced by create_archive_verifier(), and CreateBackupStreamer().

◆ astreamer_tar_terminator_new()

astreamer* astreamer_tar_terminator_new ( astreamer next)

Definition at line 462 of file astreamer_tar.c.

463 {
464  astreamer *streamer;
465 
466  streamer = palloc0(sizeof(astreamer));
467  *((const astreamer_ops **) &streamer->bbs_ops) =
469  streamer->bbs_next = next;
470 
471  return streamer;
472 }
static const astreamer_ops astreamer_tar_terminator_ops
Definition: astreamer_tar.c:79

References astreamer_tar_terminator_ops, astreamer::bbs_next, astreamer::bbs_ops, next, and palloc0().

Referenced by CreateBackupStreamer().

◆ astreamer_zstd_compressor_new()

astreamer* astreamer_zstd_compressor_new ( astreamer next,
pg_compress_specification compress 
)

Definition at line 70 of file astreamer_zstd.c.

71 {
72 #ifdef USE_ZSTD
73  astreamer_zstd_frame *streamer;
74  size_t ret;
75 
76  Assert(next != NULL);
77 
78  streamer = palloc0(sizeof(astreamer_zstd_frame));
79 
80  *((const astreamer_ops **) &streamer->base.bbs_ops) =
81  &astreamer_zstd_compressor_ops;
82 
83  streamer->base.bbs_next = next;
84  initStringInfo(&streamer->base.bbs_buffer);
85  enlargeStringInfo(&streamer->base.bbs_buffer, ZSTD_DStreamOutSize());
86 
87  streamer->cctx = ZSTD_createCCtx();
88  if (!streamer->cctx)
89  pg_fatal("could not create zstd compression context");
90 
91  /* Set compression level */
92  ret = ZSTD_CCtx_setParameter(streamer->cctx, ZSTD_c_compressionLevel,
93  compress->level);
94  if (ZSTD_isError(ret))
95  pg_fatal("could not set zstd compression level to %d: %s",
96  compress->level, ZSTD_getErrorName(ret));
97 
98  /* Set # of workers, if specified */
99  if ((compress->options & PG_COMPRESSION_OPTION_WORKERS) != 0)
100  {
101  /*
102  * On older versions of libzstd, this option does not exist, and
103  * trying to set it will fail. Similarly for newer versions if they
104  * are compiled without threading support.
105  */
106  ret = ZSTD_CCtx_setParameter(streamer->cctx, ZSTD_c_nbWorkers,
107  compress->workers);
108  if (ZSTD_isError(ret))
109  pg_fatal("could not set compression worker count to %d: %s",
110  compress->workers, ZSTD_getErrorName(ret));
111  }
112 
113  if ((compress->options & PG_COMPRESSION_OPTION_LONG_DISTANCE) != 0)
114  {
115  ret = ZSTD_CCtx_setParameter(streamer->cctx,
116  ZSTD_c_enableLongDistanceMatching,
117  compress->long_distance);
118  if (ZSTD_isError(ret))
119  {
120  pg_log_error("could not enable long-distance mode: %s",
121  ZSTD_getErrorName(ret));
122  exit(1);
123  }
124  }
125 
126  /* Initialize the ZSTD output buffer. */
127  streamer->zstd_outBuf.dst = streamer->base.bbs_buffer.data;
128  streamer->zstd_outBuf.size = streamer->base.bbs_buffer.maxlen;
129  streamer->zstd_outBuf.pos = 0;
130 
131  return &streamer->base;
132 #else
133  pg_fatal("this build does not support compression with %s", "ZSTD");
134  return NULL; /* keep compiler quiet */
135 #endif
136 }
#define PG_COMPRESSION_OPTION_WORKERS
Definition: compression.h:29
#define PG_COMPRESSION_OPTION_LONG_DISTANCE
Definition: compression.h:30
exit(1)
void enlargeStringInfo(StringInfo str, int needed)
Definition: stringinfo.c:286

References Assert, enlargeStringInfo(), exit(), initStringInfo(), pg_compress_specification::level, pg_compress_specification::long_distance, next, pg_compress_specification::options, palloc0(), PG_COMPRESSION_OPTION_LONG_DISTANCE, PG_COMPRESSION_OPTION_WORKERS, pg_fatal, pg_log_error, and pg_compress_specification::workers.

Referenced by CreateBackupStreamer().

◆ astreamer_zstd_decompressor_new()

astreamer* astreamer_zstd_decompressor_new ( astreamer next)

Definition at line 262 of file astreamer_zstd.c.

263 {
264 #ifdef USE_ZSTD
265  astreamer_zstd_frame *streamer;
266 
267  Assert(next != NULL);
268 
269  streamer = palloc0(sizeof(astreamer_zstd_frame));
270  *((const astreamer_ops **) &streamer->base.bbs_ops) =
271  &astreamer_zstd_decompressor_ops;
272 
273  streamer->base.bbs_next = next;
274  initStringInfo(&streamer->base.bbs_buffer);
275  enlargeStringInfo(&streamer->base.bbs_buffer, ZSTD_DStreamOutSize());
276 
277  streamer->dctx = ZSTD_createDCtx();
278  if (!streamer->dctx)
279  pg_fatal("could not create zstd decompression context");
280 
281  /* Initialize the ZSTD output buffer. */
282  streamer->zstd_outBuf.dst = streamer->base.bbs_buffer.data;
283  streamer->zstd_outBuf.size = streamer->base.bbs_buffer.maxlen;
284  streamer->zstd_outBuf.pos = 0;
285 
286  return &streamer->base;
287 #else
288  pg_fatal("this build does not support compression with %s", "ZSTD");
289  return NULL; /* keep compiler quiet */
290 #endif
291 }

References Assert, enlargeStringInfo(), initStringInfo(), next, palloc0(), and pg_fatal.

Referenced by create_archive_verifier(), and CreateBackupStreamer().