23 typedef struct bbsink_lz4
31 LZ4F_compressionContext_t ctx;
32 LZ4F_preferences_t prefs;
38 static void bbsink_lz4_begin_backup(
bbsink *sink);
39 static void bbsink_lz4_begin_archive(
bbsink *sink,
const char *archive_name);
40 static void bbsink_lz4_archive_contents(
bbsink *sink,
size_t avail_in);
41 static void bbsink_lz4_manifest_contents(
bbsink *sink,
size_t len);
42 static void bbsink_lz4_end_archive(
bbsink *sink);
43 static void bbsink_lz4_cleanup(
bbsink *sink);
47 .begin_archive = bbsink_lz4_begin_archive,
48 .archive_contents = bbsink_lz4_archive_contents,
49 .end_archive = bbsink_lz4_end_archive,
51 .manifest_contents = bbsink_lz4_manifest_contents,
54 .cleanup = bbsink_lz4_cleanup
66 (
errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
67 errmsg(
"lz4 compression is not supported by this build")));
78 sink =
palloc0(
sizeof(bbsink_lz4));
79 *((
const bbsink_ops **) &sink->base.bbs_ops) = &bbsink_lz4_ops;
80 sink->base.bbs_next =
next;
93 bbsink_lz4_begin_backup(
bbsink *sink)
95 bbsink_lz4 *mysink = (bbsink_lz4 *) sink;
96 size_t output_buffer_bound;
97 LZ4F_preferences_t *prefs = &mysink->prefs;
100 memset(prefs, 0,
sizeof(LZ4F_preferences_t));
101 prefs->frameInfo.blockSizeID = LZ4F_max256KB;
102 prefs->compressionLevel = mysink->compresslevel;
108 mysink->base.bbs_buffer =
palloc(mysink->base.bbs_buffer_length);
116 output_buffer_bound = LZ4F_compressBound(mysink->base.bbs_buffer_length,
122 output_buffer_bound = output_buffer_bound + BLCKSZ -
123 (output_buffer_bound % BLCKSZ);
132 bbsink_lz4_begin_archive(
bbsink *sink,
const char *archive_name)
134 bbsink_lz4 *mysink = (bbsink_lz4 *) sink;
135 char *lz4_archive_name;
136 LZ4F_errorCode_t ctxError;
139 ctxError = LZ4F_createCompressionContext(&mysink->ctx, LZ4F_VERSION);
140 if (LZ4F_isError(ctxError))
141 elog(
ERROR,
"could not create lz4 compression context: %s",
142 LZ4F_getErrorName(ctxError));
145 headerSize = LZ4F_compressBegin(mysink->ctx,
146 mysink->base.bbs_next->bbs_buffer,
147 mysink->base.bbs_next->bbs_buffer_length,
150 if (LZ4F_isError(headerSize))
151 elog(
ERROR,
"could not write lz4 header: %s",
152 LZ4F_getErrorName(headerSize));
159 mysink->bytes_written += headerSize;
162 lz4_archive_name =
psprintf(
"%s.lz4", archive_name);
165 pfree(lz4_archive_name);
180 bbsink_lz4_archive_contents(
bbsink *sink,
size_t avail_in)
182 bbsink_lz4 *mysink = (bbsink_lz4 *) sink;
183 size_t compressedSize;
184 size_t avail_in_bound;
186 avail_in_bound = LZ4F_compressBound(avail_in, &mysink->prefs);
193 if ((mysink->base.bbs_next->bbs_buffer_length - mysink->bytes_written) <
197 mysink->bytes_written = 0;
203 compressedSize = LZ4F_compressUpdate(mysink->ctx,
204 mysink->base.bbs_next->bbs_buffer + mysink->bytes_written,
205 mysink->base.bbs_next->bbs_buffer_length - mysink->bytes_written,
206 (
uint8 *) mysink->base.bbs_buffer,
210 if (LZ4F_isError(compressedSize))
211 elog(
ERROR,
"could not compress data: %s",
212 LZ4F_getErrorName(compressedSize));
217 mysink->bytes_written += compressedSize;
228 bbsink_lz4_end_archive(
bbsink *sink)
230 bbsink_lz4 *mysink = (bbsink_lz4 *) sink;
231 size_t compressedSize;
232 size_t lz4_footer_bound;
234 lz4_footer_bound = LZ4F_compressBound(0, &mysink->prefs);
236 Assert(mysink->base.bbs_next->bbs_buffer_length >= lz4_footer_bound);
238 if ((mysink->base.bbs_next->bbs_buffer_length - mysink->bytes_written) <
242 mysink->bytes_written = 0;
245 compressedSize = LZ4F_compressEnd(mysink->ctx,
246 mysink->base.bbs_next->bbs_buffer + mysink->bytes_written,
247 mysink->base.bbs_next->bbs_buffer_length - mysink->bytes_written,
250 if (LZ4F_isError(compressedSize))
251 elog(
ERROR,
"could not end lz4 compression: %s",
252 LZ4F_getErrorName(compressedSize));
255 mysink->bytes_written += compressedSize;
259 mysink->bytes_written = 0;
262 LZ4F_freeCompressionContext(mysink->ctx);
274 bbsink_lz4_manifest_contents(
bbsink *sink,
size_t len)
285 bbsink_lz4_cleanup(
bbsink *sink)
287 bbsink_lz4 *mysink = (bbsink_lz4 *) sink;
291 LZ4F_freeCompressionContext(mysink->ctx);
bbsink * bbsink_lz4_new(bbsink *next, pg_compress_specification *compress)
void bbsink_forward_begin_manifest(bbsink *sink)
void bbsink_forward_end_backup(bbsink *sink, XLogRecPtr endptr, TimeLineID endtli)
void bbsink_forward_end_archive(bbsink *sink)
void bbsink_forward_end_manifest(bbsink *sink)
static void bbsink_begin_backup(bbsink *sink, bbsink_state *state, int buffer_length)
static void bbsink_begin_archive(bbsink *sink, const char *archive_name)
static void bbsink_archive_contents(bbsink *sink, size_t len)
static void bbsink_manifest_contents(bbsink *sink, size_t len)
#define Assert(condition)
int errcode(int sqlerrcode)
int errmsg(const char *fmt,...)
#define ereport(elevel,...)
void pfree(void *pointer)
void * palloc0(Size size)
char * psprintf(const char *fmt,...)
void(* begin_backup)(bbsink *sink)