diff --git a/src/libstore/filetransfer.cc b/src/libstore/filetransfer.cc index 8ea5cdc9d..514ab3bf9 100644 --- a/src/libstore/filetransfer.cc +++ b/src/libstore/filetransfer.cc @@ -148,7 +148,7 @@ struct curlFileTransfer : public FileTransfer } LambdaSink finalSink; - std::shared_ptr decompressionSink; + std::shared_ptr decompressionSink; std::optional errorSink; std::exception_ptr writeException; diff --git a/src/libutil/compression.cc b/src/libutil/compression.cc index 986ba2976..8ba536000 100644 --- a/src/libutil/compression.cc +++ b/src/libutil/compression.cc @@ -1,10 +1,13 @@ #include "compression.hh" +#include "tarfile.hh" #include "util.hh" #include "finally.hh" #include "logging.hh" #include #include +#include +#include #include #include @@ -35,6 +38,80 @@ struct ChunkedCompressionSink : CompressionSink virtual void writeInternal(std::string_view data) = 0; }; +struct ArchiveDecompressionSource : Source +{ + std::unique_ptr archive = 0; + Source & src; + ArchiveDecompressionSource(Source & src) : src(src) {} + ~ArchiveDecompressionSource() override {} + size_t read(char * data, size_t len) override { + struct archive_entry* ae; + if (!archive) { + archive = std::make_unique(src, true); + this->archive->check(archive_read_next_header(this->archive->archive, &ae), "Failed to read header (%s)"); + if (archive_filter_count(this->archive->archive) < 2) { + throw CompressionError("Input compression not recognized."); + } + } + ssize_t result = archive_read_data(this->archive->archive, data, len); + if (result > 0) return result; + if (result == 0) { + throw EndOfFile("reached end of compressed file"); + } + this->archive->check(result, "Failed to read compressed data (%s)"); + return result; + } +}; +struct ArchiveCompressionSink : CompressionSink +{ + Sink & nextSink; + struct archive* archive; + ArchiveCompressionSink(Sink & nextSink, std::string format, bool parallel) : nextSink(nextSink) { + archive = archive_write_new(); + if (!archive) throw Error("failed to initialize libarchive"); + check(archive_write_add_filter_by_name(archive, format.c_str()), "Couldn't initialize compression (%s)"); + check(archive_write_set_format_raw(archive)); + if (format == "xz" && parallel) { + check(archive_write_set_filter_option(archive, format.c_str(), "threads", "0")); + } + // disable internal buffering + check(archive_write_set_bytes_per_block(archive, 0)); + // disable output padding + check(archive_write_set_bytes_in_last_block(archive, 1)); + open(); + } + ~ArchiveCompressionSink() override { + if (archive) archive_write_free(archive); + } + void finish() override { + flush(); + check(archive_write_close(archive)); + } + void check(int err, const char *reason="Failed to compress (%s)") { + if (err == ARCHIVE_EOF) + throw EndOfFile("reached end of archive"); + else if (err != ARCHIVE_OK) + throw Error(reason, archive_error_string(this->archive)); + } + void write(std::string_view data) override { + ssize_t result = archive_write_data(archive, data.data(), data.length()); + if (result <= 0) check(result); + } +private: + void open() { + check(archive_write_open(archive, this, NULL, ArchiveCompressionSink::callback_write, NULL)); + struct archive_entry *ae = archive_entry_new(); + archive_entry_set_filetype(ae, AE_IFREG); + check(archive_write_header(archive, ae)); + archive_entry_free(ae); + } + static ssize_t callback_write(struct archive *archive, void *_self, const void *buffer, size_t length) { + ArchiveCompressionSink *self = (ArchiveCompressionSink *)_self; + self->nextSink({(const char*)buffer, length}); + return length; + } +}; + struct NoneSink : CompressionSink { Sink & nextSink; @@ -43,171 +120,6 @@ struct NoneSink : CompressionSink void write(std::string_view data) override { nextSink(data); } }; -struct GzipDecompressionSink : CompressionSink -{ - Sink & nextSink; - z_stream strm; - bool finished = false; - uint8_t outbuf[BUFSIZ]; - - GzipDecompressionSink(Sink & nextSink) : nextSink(nextSink) - { - strm.zalloc = Z_NULL; - strm.zfree = Z_NULL; - strm.opaque = Z_NULL; - strm.avail_in = 0; - strm.next_in = Z_NULL; - strm.next_out = outbuf; - strm.avail_out = sizeof(outbuf); - - // Enable gzip and zlib decoding (+32) with 15 windowBits - int ret = inflateInit2(&strm,15+32); - if (ret != Z_OK) - throw CompressionError("unable to initialise gzip encoder"); - } - - ~GzipDecompressionSink() - { - inflateEnd(&strm); - } - - void finish() override - { - CompressionSink::flush(); - write({}); - } - - void write(std::string_view data) override - { - assert(data.size() <= std::numeric_limits::max()); - - strm.next_in = (Bytef *) data.data(); - strm.avail_in = data.size(); - - while (!finished && (!data.data() || strm.avail_in)) { - checkInterrupt(); - - int ret = inflate(&strm,Z_SYNC_FLUSH); - if (ret != Z_OK && ret != Z_STREAM_END) - throw CompressionError("error while decompressing gzip file: %d (%d, %d)", - zError(ret), data.size(), strm.avail_in); - - finished = ret == Z_STREAM_END; - - if (strm.avail_out < sizeof(outbuf) || strm.avail_in == 0) { - nextSink({(char *) outbuf, sizeof(outbuf) - strm.avail_out}); - strm.next_out = (Bytef *) outbuf; - strm.avail_out = sizeof(outbuf); - } - } - } -}; - -struct XzDecompressionSink : CompressionSink -{ - Sink & nextSink; - uint8_t outbuf[BUFSIZ]; - lzma_stream strm = LZMA_STREAM_INIT; - bool finished = false; - - XzDecompressionSink(Sink & nextSink) : nextSink(nextSink) - { - lzma_ret ret = lzma_stream_decoder( - &strm, UINT64_MAX, LZMA_CONCATENATED); - if (ret != LZMA_OK) - throw CompressionError("unable to initialise lzma decoder"); - - strm.next_out = outbuf; - strm.avail_out = sizeof(outbuf); - } - - ~XzDecompressionSink() - { - lzma_end(&strm); - } - - void finish() override - { - CompressionSink::flush(); - write({}); - } - - void write(std::string_view data) override - { - strm.next_in = (const unsigned char *) data.data(); - strm.avail_in = data.size(); - - while (!finished && (!data.data() || strm.avail_in)) { - checkInterrupt(); - - lzma_ret ret = lzma_code(&strm, data.data() ? LZMA_RUN : LZMA_FINISH); - if (ret != LZMA_OK && ret != LZMA_STREAM_END) - throw CompressionError("error %d while decompressing xz file", ret); - - finished = ret == LZMA_STREAM_END; - - if (strm.avail_out < sizeof(outbuf) || strm.avail_in == 0) { - nextSink({(char *) outbuf, sizeof(outbuf) - strm.avail_out}); - strm.next_out = outbuf; - strm.avail_out = sizeof(outbuf); - } - } - } -}; - -struct BzipDecompressionSink : ChunkedCompressionSink -{ - Sink & nextSink; - bz_stream strm; - bool finished = false; - - BzipDecompressionSink(Sink & nextSink) : nextSink(nextSink) - { - memset(&strm, 0, sizeof(strm)); - int ret = BZ2_bzDecompressInit(&strm, 0, 0); - if (ret != BZ_OK) - throw CompressionError("unable to initialise bzip2 decoder"); - - strm.next_out = (char *) outbuf; - strm.avail_out = sizeof(outbuf); - } - - ~BzipDecompressionSink() - { - BZ2_bzDecompressEnd(&strm); - } - - void finish() override - { - flush(); - write({}); - } - - void writeInternal(std::string_view data) override - { - assert(data.size() <= std::numeric_limits::max()); - - strm.next_in = (char *) data.data(); - strm.avail_in = data.size(); - - while (strm.avail_in) { - checkInterrupt(); - - int ret = BZ2_bzDecompress(&strm); - if (ret != BZ_OK && ret != BZ_STREAM_END) - throw CompressionError("error while decompressing bzip2 file"); - - finished = ret == BZ_STREAM_END; - - if (strm.avail_out < sizeof(outbuf) || strm.avail_in == 0) { - nextSink({(char *) outbuf, sizeof(outbuf) - strm.avail_out}); - strm.next_out = (char *) outbuf; - strm.avail_out = sizeof(outbuf); - } - } - } -}; - struct BrotliDecompressionSink : ChunkedCompressionSink { Sink & nextSink; @@ -261,161 +173,32 @@ struct BrotliDecompressionSink : ChunkedCompressionSink ref decompress(const std::string & method, const std::string & in) { - StringSink ssink; - auto sink = makeDecompressionSink(method, ssink); - (*sink)(in); - sink->finish(); - return ssink.s; + if (method == "br") { + StringSink ssink; + auto sink = makeDecompressionSink(method, ssink); + (*sink)(in); + sink->finish(); + return ssink.s; + } else { + StringSource ssrc(in); + auto src = makeDecompressionSource(ssrc); + return make_ref(src->drain()); + } } -ref makeDecompressionSink(const std::string & method, Sink & nextSink) +std::unique_ptr makeDecompressionSink(const std::string & method, Sink & nextSink) { if (method == "none" || method == "") - return make_ref(nextSink); - else if (method == "xz") - return make_ref(nextSink); - else if (method == "bzip2") - return make_ref(nextSink); - else if (method == "gzip") - return make_ref(nextSink); + return std::make_unique(nextSink); else if (method == "br") - return make_ref(nextSink); + return std::make_unique(nextSink); else - throw UnknownCompressionMethod("unknown compression method '%s'", method); + return sourceToSink([&](Source & source) { + auto decompressionSource = makeDecompressionSource(source); + decompressionSource->drainInto(nextSink); + }); } -struct XzCompressionSink : CompressionSink -{ - Sink & nextSink; - uint8_t outbuf[BUFSIZ]; - lzma_stream strm = LZMA_STREAM_INIT; - bool finished = false; - - XzCompressionSink(Sink & nextSink, bool parallel) : nextSink(nextSink) - { - lzma_ret ret; - bool done = false; - - if (parallel) { -#ifdef HAVE_LZMA_MT - lzma_mt mt_options = {}; - mt_options.flags = 0; - mt_options.timeout = 300; // Using the same setting as the xz cmd line - mt_options.preset = LZMA_PRESET_DEFAULT; - mt_options.filters = NULL; - mt_options.check = LZMA_CHECK_CRC64; - mt_options.threads = lzma_cputhreads(); - mt_options.block_size = 0; - if (mt_options.threads == 0) - mt_options.threads = 1; - // FIXME: maybe use lzma_stream_encoder_mt_memusage() to control the - // number of threads. - ret = lzma_stream_encoder_mt(&strm, &mt_options); - done = true; -#else - printMsg(lvlError, "warning: parallel XZ compression requested but not supported, falling back to single-threaded compression"); -#endif - } - - if (!done) - ret = lzma_easy_encoder(&strm, 6, LZMA_CHECK_CRC64); - - if (ret != LZMA_OK) - throw CompressionError("unable to initialise lzma encoder"); - - // FIXME: apply the x86 BCJ filter? - - strm.next_out = outbuf; - strm.avail_out = sizeof(outbuf); - } - - ~XzCompressionSink() - { - lzma_end(&strm); - } - - void finish() override - { - CompressionSink::flush(); - write({}); - } - - void write(std::string_view data) override - { - strm.next_in = (const unsigned char *) data.data(); - strm.avail_in = data.size(); - - while (!finished && (!data.data() || strm.avail_in)) { - checkInterrupt(); - - lzma_ret ret = lzma_code(&strm, data.data() ? LZMA_RUN : LZMA_FINISH); - if (ret != LZMA_OK && ret != LZMA_STREAM_END) - throw CompressionError("error %d while compressing xz file", ret); - - finished = ret == LZMA_STREAM_END; - - if (strm.avail_out < sizeof(outbuf) || strm.avail_in == 0) { - nextSink({(const char *) outbuf, sizeof(outbuf) - strm.avail_out}); - strm.next_out = outbuf; - strm.avail_out = sizeof(outbuf); - } - } - } -}; - -struct BzipCompressionSink : ChunkedCompressionSink -{ - Sink & nextSink; - bz_stream strm; - bool finished = false; - - BzipCompressionSink(Sink & nextSink) : nextSink(nextSink) - { - memset(&strm, 0, sizeof(strm)); - int ret = BZ2_bzCompressInit(&strm, 9, 0, 30); - if (ret != BZ_OK) - throw CompressionError("unable to initialise bzip2 encoder"); - - strm.next_out = (char *) outbuf; - strm.avail_out = sizeof(outbuf); - } - - ~BzipCompressionSink() - { - BZ2_bzCompressEnd(&strm); - } - - void finish() override - { - flush(); - writeInternal({}); - } - - void writeInternal(std::string_view data) override - { - assert(data.size() <= std::numeric_limits::max()); - - strm.next_in = (char *) data.data(); - strm.avail_in = data.size(); - - while (!finished && (!data.data() || strm.avail_in)) { - checkInterrupt(); - - int ret = BZ2_bzCompress(&strm, data.data() ? BZ_RUN : BZ_FINISH); - if (ret != BZ_RUN_OK && ret != BZ_FINISH_OK && ret != BZ_STREAM_END) - throw CompressionError("error %d while compressing bzip2 file", ret); - - finished = ret == BZ_STREAM_END; - - if (strm.avail_out < sizeof(outbuf) || strm.avail_in == 0) { - nextSink({(const char *) outbuf, sizeof(outbuf) - strm.avail_out}); - strm.next_out = (char *) outbuf; - strm.avail_out = sizeof(outbuf); - } - } - } -}; - struct BrotliCompressionSink : ChunkedCompressionSink { Sink & nextSink; @@ -468,15 +251,20 @@ struct BrotliCompressionSink : ChunkedCompressionSink } } }; +std::unique_ptr makeDecompressionSource(Source & prev) { + return std::unique_ptr(new ArchiveDecompressionSource(prev)); +} ref makeCompressionSink(const std::string & method, Sink & nextSink, const bool parallel) { + std::vector la_supports = { + "bzip2", "compress", "grzip", "gzip", "lrzip", "lz4", "lzip", "lzma", "lzop", "xz", "zstd" + }; + if (std::find(la_supports.begin(), la_supports.end(), method) != la_supports.end()) { + return make_ref(nextSink, method, parallel); + } if (method == "none") return make_ref(nextSink); - else if (method == "xz") - return make_ref(nextSink, parallel); - else if (method == "bzip2") - return make_ref(nextSink); else if (method == "br") return make_ref(nextSink); else diff --git a/src/libutil/compression.hh b/src/libutil/compression.hh index dd666a4e1..192cb3e91 100644 --- a/src/libutil/compression.hh +++ b/src/libutil/compression.hh @@ -8,14 +8,18 @@ namespace nix { -struct CompressionSink : BufferedSink +struct CompressionSink : BufferedSink, FinishSink { - virtual void finish() = 0; + using BufferedSink::operator (); + using BufferedSink::write; + using FinishSink::finish; }; +std::unique_ptr makeDecompressionSource(Source & prev); + ref decompress(const std::string & method, const std::string & in); -ref makeDecompressionSink(const std::string & method, Sink & nextSink); +std::unique_ptr makeDecompressionSink(const std::string & method, Sink & nextSink); ref compress(const std::string & method, const std::string & in, const bool parallel = false); diff --git a/src/libutil/serialise.cc b/src/libutil/serialise.cc index d1a16b6ba..374b48d79 100644 --- a/src/libutil/serialise.cc +++ b/src/libutil/serialise.cc @@ -201,6 +201,61 @@ static DefaultStackAllocator defaultAllocatorSingleton; StackAllocator *StackAllocator::defaultAllocator = &defaultAllocatorSingleton; +std::unique_ptr sourceToSink(std::function fun) +{ + struct SourceToSink : FinishSink + { + typedef boost::coroutines2::coroutine coro_t; + + std::function fun; + std::optional coro; + + SourceToSink(std::function fun) : fun(fun) + { + } + + std::string_view cur; + + void operator () (std::string_view in) override + { + if (in.empty()) return; + cur = in; + + if (!coro) + coro = coro_t::push_type(VirtualStackAllocator{}, [&](coro_t::pull_type & yield) { + LambdaSource source([&](char *out, size_t out_len) { + if (cur.empty()) { + yield(); + if (yield.get()) { + return (size_t)0; + } + } + + size_t n = std::min(cur.size(), out_len); + memcpy(out, cur.data(), n); + cur.remove_prefix(n); + return n; + }); + fun(source); + }); + + if (!*coro) { abort(); } + + if (!cur.empty()) (*coro)(false); + } + + void finish() { + if (!coro) return; + if (!*coro) abort(); + (*coro)(true); + if (*coro) abort(); + } + }; + + return std::make_unique(fun); +} + + std::unique_ptr sinkToSource( std::function fun, std::function eof) @@ -212,7 +267,6 @@ std::unique_ptr sinkToSource( std::function fun; std::function eof; std::optional coro; - bool started = false; SinkToSource(std::function fun, std::function eof) : fun(fun), eof(eof) diff --git a/src/libutil/serialise.hh b/src/libutil/serialise.hh index 5bbbc7ce3..0fe6e8332 100644 --- a/src/libutil/serialise.hh +++ b/src/libutil/serialise.hh @@ -25,6 +25,13 @@ struct NullSink : Sink { } }; + +struct FinishSink : virtual Sink +{ + virtual void finish() = 0; +}; + + /* A buffered abstract sink. Warning: a BufferedSink should not be used from multiple threads concurrently. */ struct BufferedSink : virtual Sink @@ -281,6 +288,7 @@ struct ChainSource : Source size_t read(char * data, size_t len) override; }; +std::unique_ptr sourceToSink(std::function fun); /* Convert a function that feeds data into a Sink into a Source. The Source executes the function as a coroutine. */ diff --git a/src/libutil/tarfile.cc b/src/libutil/tarfile.cc index 2da169ba7..b5e1cb4c0 100644 --- a/src/libutil/tarfile.cc +++ b/src/libutil/tarfile.cc @@ -2,83 +2,73 @@ #include #include "serialise.hh" +#include "tarfile.hh" namespace nix { +static int callback_open(struct archive *, void *self) { + return ARCHIVE_OK; +} -struct TarArchive { - struct archive * archive; - Source * source; - std::vector buffer; +static ssize_t callback_read(struct archive * archive, void * _self, const void * * buffer) { + TarArchive *self = (TarArchive *)_self; + *buffer = self->buffer.data(); - void check(int err, const char * reason = "failed to extract archive: %s") - { + try { + return self->source->read((char *) self->buffer.data(), 4096); + } catch (EndOfFile &) { + return 0; + } catch (std::exception &err) { + archive_set_error(archive, EIO, "Source threw exception: %s", err.what()); + + return -1; + } +} + +static int callback_close(struct archive *, void *self) { + return ARCHIVE_OK; +} + +void TarArchive::check(int err, const char *reason) +{ if (err == ARCHIVE_EOF) throw EndOfFile("reached end of archive"); else if (err != ARCHIVE_OK) throw Error(reason, archive_error_string(this->archive)); } - TarArchive(Source & source) : buffer(4096) - { - this->archive = archive_read_new(); - this->source = &source; +TarArchive::TarArchive(Source& source, bool raw) : buffer(4096) +{ + this->archive = archive_read_new(); + this->source = &source; + if (!raw) { archive_read_support_filter_all(archive); archive_read_support_format_all(archive); - check(archive_read_open(archive, - (void *)this, - TarArchive::callback_open, - TarArchive::callback_read, - TarArchive::callback_close), - "failed to open archive: %s"); - } - - TarArchive(const Path & path) - { - this->archive = archive_read_new(); - + } else { archive_read_support_filter_all(archive); - archive_read_support_format_all(archive); - check(archive_read_open_filename(archive, path.c_str(), 16384), "failed to open archive: %s"); + archive_read_support_format_raw(archive); + archive_read_support_format_empty(archive); } + check(archive_read_open(archive, (void *)this, callback_open, callback_read, callback_close), "Failed to open archive (%s)"); +} - TarArchive(const TarArchive &) = delete; - void close() - { - check(archive_read_close(archive), "failed to close archive: %s"); - } +TarArchive::TarArchive(const Path &path) +{ + this->archive = archive_read_new(); - ~TarArchive() - { - if (this->archive) archive_read_free(this->archive); - } + archive_read_support_filter_all(archive); + archive_read_support_format_all(archive); + check(archive_read_open_filename(archive, path.c_str(), 16384), "failed to open archive: %s"); +} -private: +void TarArchive::close() { + check(archive_read_close(this->archive), "Failed to close archive (%s)"); +} - static int callback_open(struct archive *, void * self) { - return ARCHIVE_OK; - } - - static ssize_t callback_read(struct archive * archive, void * _self, const void * * buffer) - { - auto self = (TarArchive *)_self; - *buffer = self->buffer.data(); - - try { - return self->source->read((char *) self->buffer.data(), 4096); - } catch (EndOfFile &) { - return 0; - } catch (std::exception & err) { - archive_set_error(archive, EIO, "source threw exception: %s", err.what()); - return -1; - } - } - - static int callback_close(struct archive *, void * self) { - return ARCHIVE_OK; - } -}; +TarArchive::~TarArchive() { + if (this->archive) archive_read_free(this->archive); +} static void extract_archive(TarArchive & archive, const Path & destDir) { diff --git a/src/libutil/tarfile.hh b/src/libutil/tarfile.hh index 89a024f1d..18adf3490 100644 --- a/src/libutil/tarfile.hh +++ b/src/libutil/tarfile.hh @@ -1,7 +1,26 @@ #include "serialise.hh" +#include namespace nix { +struct TarArchive { + struct archive *archive; + Source *source; + std::vector buffer; + + void check(int err, const char *reason = "Failed to extract archive (%s)"); + + TarArchive(Source& source, bool raw = false); + + TarArchive(const Path &path); + + // disable copy constructor + TarArchive(const TarArchive&) = delete; + + void close(); + + ~TarArchive(); +}; void unpackTarfile(Source & source, const Path & destDir); void unpackTarfile(const Path & tarFile, const Path & destDir);