23 typedef struct bbsink_zstd
32 ZSTD_outBuffer zstd_outBuf;
35 static void bbsink_zstd_begin_backup(
bbsink *sink);
36 static void bbsink_zstd_begin_archive(
bbsink *sink,
const char *archive_name);
37 static void bbsink_zstd_archive_contents(
bbsink *sink,
size_t len);
38 static void bbsink_zstd_manifest_contents(
bbsink *sink,
size_t len);
39 static void bbsink_zstd_end_archive(
bbsink *sink);
40 static void bbsink_zstd_cleanup(
bbsink *sink);
46 .begin_archive = bbsink_zstd_begin_archive,
47 .archive_contents = bbsink_zstd_archive_contents,
48 .end_archive = bbsink_zstd_end_archive,
50 .manifest_contents = bbsink_zstd_manifest_contents,
52 .end_backup = bbsink_zstd_end_backup,
53 .cleanup = bbsink_zstd_cleanup
65 (
errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
66 errmsg(
"zstd compression is not supported by this build")));
73 sink =
palloc0(
sizeof(bbsink_zstd));
74 *((
const bbsink_ops **) &sink->base.bbs_ops) = &bbsink_zstd_ops;
75 sink->base.bbs_next =
next;
76 sink->compress = compress;
88 bbsink_zstd_begin_backup(
bbsink *sink)
90 bbsink_zstd *mysink = (bbsink_zstd *) sink;
91 size_t output_buffer_bound;
95 mysink->cctx = ZSTD_createCCtx();
97 elog(
ERROR,
"could not create zstd compression context");
99 ret = ZSTD_CCtx_setParameter(mysink->cctx, ZSTD_c_compressionLevel,
101 if (ZSTD_isError(ret))
102 elog(
ERROR,
"could not set zstd compression level to %d: %s",
103 compress->
level, ZSTD_getErrorName(ret));
112 ret = ZSTD_CCtx_setParameter(mysink->cctx, ZSTD_c_nbWorkers,
114 if (ZSTD_isError(ret))
116 errcode(ERRCODE_INVALID_PARAMETER_VALUE),
117 errmsg(
"could not set compression worker count to %d: %s",
118 compress->
workers, ZSTD_getErrorName(ret)));
123 ret = ZSTD_CCtx_setParameter(mysink->cctx,
124 ZSTD_c_enableLongDistanceMatching,
126 if (ZSTD_isError(ret))
128 errcode(ERRCODE_INVALID_PARAMETER_VALUE),
129 errmsg(
"could not enable long-distance mode: %s",
130 ZSTD_getErrorName(ret)));
137 mysink->base.bbs_buffer =
palloc(mysink->base.bbs_buffer_length);
143 output_buffer_bound = ZSTD_compressBound(mysink->base.bbs_buffer_length);
148 output_buffer_bound = output_buffer_bound + BLCKSZ -
149 (output_buffer_bound % BLCKSZ);
158 bbsink_zstd_begin_archive(
bbsink *sink,
const char *archive_name)
160 bbsink_zstd *mysink = (bbsink_zstd *) sink;
161 char *zstd_archive_name;
168 ZSTD_CCtx_reset(mysink->cctx, ZSTD_reset_session_only);
170 mysink->zstd_outBuf.dst = mysink->base.bbs_next->bbs_buffer;
171 mysink->zstd_outBuf.size = mysink->base.bbs_next->bbs_buffer_length;
172 mysink->zstd_outBuf.pos = 0;
175 zstd_archive_name =
psprintf(
"%s.zst", archive_name);
178 pfree(zstd_archive_name);
193 bbsink_zstd_archive_contents(
bbsink *sink,
size_t len)
195 bbsink_zstd *mysink = (bbsink_zstd *) sink;
196 ZSTD_inBuffer inBuf = {mysink->base.bbs_buffer,
len, 0};
198 while (inBuf.pos < inBuf.size)
201 size_t max_needed = ZSTD_compressBound(inBuf.size - inBuf.pos);
207 if (mysink->zstd_outBuf.size - mysink->zstd_outBuf.pos < max_needed)
210 mysink->zstd_outBuf.pos);
211 mysink->zstd_outBuf.dst = mysink->base.bbs_next->bbs_buffer;
212 mysink->zstd_outBuf.size =
213 mysink->base.bbs_next->bbs_buffer_length;
214 mysink->zstd_outBuf.pos = 0;
217 yet_to_flush = ZSTD_compressStream2(mysink->cctx, &mysink->zstd_outBuf,
218 &inBuf, ZSTD_e_continue);
220 if (ZSTD_isError(yet_to_flush))
222 "could not compress data: %s",
223 ZSTD_getErrorName(yet_to_flush));
235 bbsink_zstd_end_archive(
bbsink *sink)
237 bbsink_zstd *mysink = (bbsink_zstd *) sink;
242 ZSTD_inBuffer in = {NULL, 0, 0};
243 size_t max_needed = ZSTD_compressBound(0);
249 if (mysink->zstd_outBuf.size - mysink->zstd_outBuf.pos < max_needed)
252 mysink->zstd_outBuf.pos);
253 mysink->zstd_outBuf.dst = mysink->base.bbs_next->bbs_buffer;
254 mysink->zstd_outBuf.size =
255 mysink->base.bbs_next->bbs_buffer_length;
256 mysink->zstd_outBuf.pos = 0;
259 yet_to_flush = ZSTD_compressStream2(mysink->cctx,
260 &mysink->zstd_outBuf,
263 if (ZSTD_isError(yet_to_flush))
264 elog(
ERROR,
"could not compress data: %s",
265 ZSTD_getErrorName(yet_to_flush));
267 }
while (yet_to_flush > 0);
270 if (mysink->zstd_outBuf.pos > 0)
272 mysink->zstd_outBuf.pos);
285 bbsink_zstd *mysink = (bbsink_zstd *) sink;
290 ZSTD_freeCCtx(mysink->cctx);
302 bbsink_zstd_manifest_contents(
bbsink *sink,
size_t len)
313 bbsink_zstd_cleanup(
bbsink *sink)
315 bbsink_zstd *mysink = (bbsink_zstd *) sink;
320 ZSTD_freeCCtx(mysink->cctx);
void bbsink_forward_begin_manifest(bbsink *sink)
void bbsink_forward_end_backup(bbsink *sink, XLogRecPtr endptr, TimeLineID endtli)
void bbsink_forward_end_archive(bbsink *sink)
void bbsink_forward_end_manifest(bbsink *sink)
static void bbsink_begin_backup(bbsink *sink, bbsink_state *state, int buffer_length)
static void bbsink_begin_archive(bbsink *sink, const char *archive_name)
static void bbsink_archive_contents(bbsink *sink, size_t len)
static void bbsink_manifest_contents(bbsink *sink, size_t len)
bbsink * bbsink_zstd_new(bbsink *next, pg_compress_specification *compress)
#define Assert(condition)
#define PG_COMPRESSION_OPTION_WORKERS
#define PG_COMPRESSION_OPTION_LONG_DISTANCE
int errcode(int sqlerrcode)
int errmsg(const char *fmt,...)
#define ereport(elevel,...)
void pfree(void *pointer)
void * palloc0(Size size)
char * psprintf(const char *fmt,...)
void(* begin_backup)(bbsink *sink)