compression: make parallel sink separate class

This commit is contained in:
Will Dietz 2018-02-11 13:23:31 -06:00
parent a0bdc96726
commit c6209030c4

View file

@ -190,33 +190,9 @@ struct XzSink : CompressionSink
lzma_stream strm = LZMA_STREAM_INIT; lzma_stream strm = LZMA_STREAM_INIT;
bool finished = false; bool finished = false;
XzSink(Sink & nextSink, const bool parallel) : nextSink(nextSink) template <typename F>
{ XzSink(Sink & nextSink, F&& initEncoder) : nextSink(nextSink) {
lzma_ret ret; lzma_ret ret = initEncoder();
if (parallel) {
#ifdef HAVE_LZMA_MT
lzma_mt mt_options = {};
mt_options.flags = 0;
mt_options.timeout = 300; // Using the same setting as the xz cmd line
mt_options.preset = LZMA_PRESET_DEFAULT;
mt_options.filters = NULL;
mt_options.check = LZMA_CHECK_CRC64;
mt_options.threads = lzma_cputhreads();
mt_options.block_size = 0;
if (mt_options.threads == 0)
mt_options.threads = 1;
// FIXME: maybe use lzma_stream_encoder_mt_memusage() to control the
// number of threads.
ret = lzma_stream_encoder_mt(
&strm, &mt_options);
} else
#else
printMsg(lvlError, "Warning: parallel XZ compression requested but not supported, falling back to single-threaded compression");
}
#endif
ret = lzma_easy_encoder(
&strm, 6, LZMA_CHECK_CRC64);
if (ret != LZMA_OK) if (ret != LZMA_OK)
throw CompressionError("unable to initialise lzma encoder"); throw CompressionError("unable to initialise lzma encoder");
// FIXME: apply the x86 BCJ filter? // FIXME: apply the x86 BCJ filter?
@ -224,6 +200,9 @@ struct XzSink : CompressionSink
strm.next_out = outbuf; strm.next_out = outbuf;
strm.avail_out = sizeof(outbuf); strm.avail_out = sizeof(outbuf);
} }
XzSink(Sink & nextSink) : XzSink(nextSink, [this]() {
return lzma_easy_encoder(&strm, 6, LZMA_CHECK_CRC64);
}) {}
~XzSink() ~XzSink()
{ {
@ -277,6 +256,27 @@ struct XzSink : CompressionSink
} }
}; };
#ifdef HAVE_LZMA_MT
struct ParallelXzSink : public XzSink
{
ParallelXzSink(Sink &nextSink) : XzSink(nextSink, [this]() {
lzma_mt mt_options = {};
mt_options.flags = 0;
mt_options.timeout = 300; // Using the same setting as the xz cmd line
mt_options.preset = LZMA_PRESET_DEFAULT;
mt_options.filters = NULL;
mt_options.check = LZMA_CHECK_CRC64;
mt_options.threads = lzma_cputhreads();
mt_options.block_size = 0;
if (mt_options.threads == 0)
mt_options.threads = 1;
// FIXME: maybe use lzma_stream_encoder_mt_memusage() to control the
// number of threads.
return lzma_stream_encoder_mt(&strm, &mt_options);
}) {}
};
#endif
struct BzipSink : CompressionSink struct BzipSink : CompressionSink
{ {
Sink & nextSink; Sink & nextSink;
@ -475,13 +475,18 @@ struct BrotliSink : CompressionSink
ref<CompressionSink> makeCompressionSink(const std::string & method, Sink & nextSink, const bool parallel) ref<CompressionSink> makeCompressionSink(const std::string & method, Sink & nextSink, const bool parallel)
{ {
if (parallel && method != "xz") if (parallel) {
#ifdef HAVE_LZMA_MT
if (method == "xz")
return make_ref<ParallelXzSink>(nextSink);
#endif
printMsg(lvlError, format("Warning: parallel compression requested but not supported for method '%1%', falling back to single-threaded compression") % method); printMsg(lvlError, format("Warning: parallel compression requested but not supported for method '%1%', falling back to single-threaded compression") % method);
}
if (method == "none") if (method == "none")
return make_ref<NoneSink>(nextSink); return make_ref<NoneSink>(nextSink);
else if (method == "xz") else if (method == "xz")
return make_ref<XzSink>(nextSink, parallel); return make_ref<XzSink>(nextSink);
else if (method == "bzip2") else if (method == "bzip2")
return make_ref<BzipSink>(nextSink); return make_ref<BzipSink>(nextSink);
else if (method == "br") else if (method == "br")