decompress(): Use a Source and Sink

This allows decompression to happen in O(1) memory.
This commit is contained in:
Eelco Dolstra 2018-03-16 16:59:31 +01:00
parent 64441f0551
commit 3e6b194d78
No known key found for this signature in database
GPG key ID: 8170B4726D7198DE
6 changed files with 210 additions and 91 deletions

View file

@ -203,22 +203,18 @@ void BinaryCacheStore::narFromPath(const Path & storePath, Sink & sink)
stats.narRead++;
stats.narReadCompressedBytes += nar->size();
/* Decompress the NAR. FIXME: would be nice to have the remote
side do this. */
try {
nar = decompress(info->compression, *nar);
} catch (UnknownCompressionMethod &) {
throw Error(format("binary cache path '%s' uses unknown compression method '%s'")
% storePath % info->compression);
}
uint64_t narSize = 0;
stats.narReadBytes += nar->size();
StringSource source(*nar);
printMsg(lvlTalkative, format("exporting path '%1%' (%2% bytes)") % storePath % nar->size());
LambdaSink wrapperSink([&](const unsigned char * data, size_t len) {
sink(data, len);
narSize += len;
});
assert(nar->size() % 8 == 0);
decompress(info->compression, source, wrapperSink);
sink((unsigned char *) nar->c_str(), nar->size());
stats.narReadBytes += narSize;
}
void BinaryCacheStore::queryPathInfoUncached(const Path & storePath,

View file

@ -17,7 +17,23 @@
namespace nix {
static ref<std::string> decompressXZ(const std::string & in)
static const size_t bufSize = 32 * 1024;
static void decompressNone(Source & source, Sink & sink)
{
std::vector<unsigned char> buf(bufSize);
while (true) {
size_t n;
try {
n = source.read(buf.data(), buf.size());
} catch (EndOfFile &) {
break;
}
sink(buf.data(), n);
}
}
static void decompressXZ(Source & source, Sink & sink)
{
lzma_stream strm(LZMA_STREAM_INIT);
@ -29,36 +45,44 @@ static ref<std::string> decompressXZ(const std::string & in)
Finally free([&]() { lzma_end(&strm); });
lzma_action action = LZMA_RUN;
uint8_t outbuf[BUFSIZ];
ref<std::string> res = make_ref<std::string>();
strm.next_in = (uint8_t *) in.c_str();
strm.avail_in = in.size();
strm.next_out = outbuf;
strm.avail_out = sizeof(outbuf);
std::vector<uint8_t> inbuf(bufSize), outbuf(bufSize);
strm.next_in = nullptr;
strm.avail_in = 0;
strm.next_out = outbuf.data();
strm.avail_out = outbuf.size();
bool eof = false;
while (true) {
checkInterrupt();
if (strm.avail_in == 0 && !eof) {
strm.next_in = inbuf.data();
try {
strm.avail_in = source.read((unsigned char *) strm.next_in, inbuf.size());
} catch (EndOfFile &) {
eof = true;
}
}
if (strm.avail_in == 0)
action = LZMA_FINISH;
lzma_ret ret = lzma_code(&strm, action);
if (strm.avail_out == 0 || ret == LZMA_STREAM_END) {
res->append((char *) outbuf, sizeof(outbuf) - strm.avail_out);
strm.next_out = outbuf;
strm.avail_out = sizeof(outbuf);
if (strm.avail_out < outbuf.size()) {
sink((unsigned char *) outbuf.data(), outbuf.size() - strm.avail_out);
strm.next_out = outbuf.data();
strm.avail_out = outbuf.size();
}
if (ret == LZMA_STREAM_END)
return res;
if (ret == LZMA_STREAM_END) return;
if (ret != LZMA_OK)
throw CompressionError("error %d while decompressing xz file", ret);
}
}
static ref<std::string> decompressBzip2(const std::string & in)
static void decompressBzip2(Source & source, Sink & sink)
{
bz_stream strm;
memset(&strm, 0, sizeof(strm));
@ -69,39 +93,50 @@ static ref<std::string> decompressBzip2(const std::string & in)
Finally free([&]() { BZ2_bzDecompressEnd(&strm); });
char outbuf[BUFSIZ];
ref<std::string> res = make_ref<std::string>();
strm.next_in = (char *) in.c_str();
strm.avail_in = in.size();
strm.next_out = outbuf;
strm.avail_out = sizeof(outbuf);
std::vector<char> inbuf(bufSize), outbuf(bufSize);
strm.next_in = nullptr;
strm.avail_in = 0;
strm.next_out = outbuf.data();
strm.avail_out = outbuf.size();
bool eof = false;
while (true) {
checkInterrupt();
int ret = BZ2_bzDecompress(&strm);
if (strm.avail_out == 0 || ret == BZ_STREAM_END) {
res->append(outbuf, sizeof(outbuf) - strm.avail_out);
strm.next_out = outbuf;
strm.avail_out = sizeof(outbuf);
if (strm.avail_in == 0 && !eof) {
strm.next_in = inbuf.data();
try {
strm.avail_in = source.read((unsigned char *) strm.next_in, inbuf.size());
} catch (EndOfFile &) {
eof = true;
}
}
if (ret == BZ_STREAM_END)
return res;
int ret = BZ2_bzDecompress(&strm);
if (strm.avail_in == 0 && strm.avail_out == outbuf.size() && eof)
throw CompressionError("bzip2 data ends prematurely");
if (strm.avail_out < outbuf.size()) {
sink((unsigned char *) outbuf.data(), outbuf.size() - strm.avail_out);
strm.next_out = outbuf.data();
strm.avail_out = outbuf.size();
}
if (ret == BZ_STREAM_END) return;
if (ret != BZ_OK)
throw CompressionError("error while decompressing bzip2 file");
if (strm.avail_in == 0)
throw CompressionError("bzip2 data ends prematurely");
}
}
static ref<std::string> decompressBrotli(const std::string & in)
static void decompressBrotli(Source & source, Sink & sink)
{
#if !HAVE_BROTLI
return make_ref<std::string>(runProgram(BROTLI, true, {"-d"}, {in}));
RunOptions options(BROTLI, {"-d"});
options.stdin = &source;
options.stdout = &sink;
runProgram2(options);
#else
auto *s = BrotliDecoderCreateInstance(nullptr, nullptr, nullptr);
if (!s)
@ -109,16 +144,26 @@ static ref<std::string> decompressBrotli(const std::string & in)
Finally free([s]() { BrotliDecoderDestroyInstance(s); });
uint8_t outbuf[BUFSIZ];
ref<std::string> res = make_ref<std::string>();
const uint8_t *next_in = (uint8_t *)in.c_str();
size_t avail_in = in.size();
uint8_t *next_out = outbuf;
size_t avail_out = sizeof(outbuf);
std::vector<uint8_t> inbuf(bufSize), outbuf(bufSize);
const uint8_t * next_in = nullptr;
size_t avail_in = 0;
bool eof = false;
while (true) {
checkInterrupt();
if (avail_in == 0 && !eof) {
next_in = inbuf.data();
try {
avail_in = source.read((unsigned char *) next_in, inbuf.size());
} catch (EndOfFile &) {
eof = true;
}
}
uint8_t * next_out = outbuf.data();
size_t avail_out = outbuf.size();
auto ret = BrotliDecoderDecompressStream(s,
&avail_in, &next_in,
&avail_out, &next_out,
@ -128,51 +173,49 @@ static ref<std::string> decompressBrotli(const std::string & in)
case BROTLI_DECODER_RESULT_ERROR:
throw CompressionError("error while decompressing brotli file");
case BROTLI_DECODER_RESULT_NEEDS_MORE_INPUT:
if (eof)
throw CompressionError("incomplete or corrupt brotli file");
break;
case BROTLI_DECODER_RESULT_SUCCESS:
if (avail_in != 0)
throw CompressionError("unexpected input after brotli decompression");
break;
case BROTLI_DECODER_RESULT_NEEDS_MORE_OUTPUT:
// I'm not sure if this can happen, but abort if this happens with empty buffer
if (avail_out == sizeof(outbuf))
if (avail_out == outbuf.size())
throw CompressionError("brotli decompression requires larger buffer");
break;
}
// Always ensure we have full buffer for next invocation
if (avail_out < sizeof(outbuf)) {
res->append((char*)outbuf, sizeof(outbuf) - avail_out);
next_out = outbuf;
avail_out = sizeof(outbuf);
}
if (avail_out < outbuf.size())
sink((unsigned char *) outbuf.data(), outbuf.size() - avail_out);
if (ret == BROTLI_DECODER_RESULT_SUCCESS) return res;
if (ret == BROTLI_DECODER_RESULT_SUCCESS) return;
}
#endif // HAVE_BROTLI
}
ref<std::string> compress(const std::string & method, const std::string & in, const bool parallel)
{
StringSink ssink;
auto sink = makeCompressionSink(method, ssink, parallel);
(*sink)(in);
sink->finish();
return ssink.s;
}
ref<std::string> decompress(const std::string & method, const std::string & in)
{
StringSource source(in);
StringSink sink;
decompress(method, source, sink);
return sink.s;
}
void decompress(const std::string & method, Source & source, Sink & sink)
{
if (method == "none")
return make_ref<std::string>(in);
return decompressNone(source, sink);
else if (method == "xz")
return decompressXZ(in);
return decompressXZ(source, sink);
else if (method == "bzip2")
return decompressBzip2(in);
return decompressBzip2(source, sink);
else if (method == "br")
return decompressBrotli(in);
return decompressBrotli(source, sink);
else
throw UnknownCompressionMethod(format("unknown compression method '%s'") % method);
throw UnknownCompressionMethod("unknown compression method '%s'", method);
}
struct NoneSink : CompressionSink
@ -499,4 +542,13 @@ ref<CompressionSink> makeCompressionSink(const std::string & method, Sink & next
throw UnknownCompressionMethod(format("unknown compression method '%s'") % method);
}
ref<std::string> compress(const std::string & method, const std::string & in, const bool parallel)
{
StringSink ssink;
auto sink = makeCompressionSink(method, ssink, parallel);
(*sink)(in);
sink->finish();
return ssink.s;
}
}

View file

@ -8,10 +8,12 @@
namespace nix {
ref<std::string> compress(const std::string & method, const std::string & in, const bool parallel = false);
ref<std::string> decompress(const std::string & method, const std::string & in);
void decompress(const std::string & method, Source & source, Sink & sink);
ref<std::string> compress(const std::string & method, const std::string & in, const bool parallel = false);
struct CompressionSink : BufferedSink
{
virtual void finish() = 0;

View file

@ -56,7 +56,7 @@ struct Source
void operator () (unsigned char * data, size_t len);
/* Store up to len in the buffer pointed to by data, and
return the number of bytes stored. If blocks until at least
return the number of bytes stored. It blocks until at least
one byte is available. */
virtual size_t read(unsigned char * data, size_t len) = 0;
@ -175,6 +175,22 @@ struct TeeSource : Source
};
/* Convert a function into a sink. */
struct LambdaSink : Sink
{
typedef std::function<void(const unsigned char *, size_t)> lambda_t;
lambda_t lambda;
LambdaSink(const lambda_t & lambda) : lambda(lambda) { }
virtual void operator () (const unsigned char * data, size_t len)
{
lambda(data, len);
}
};
void writePadding(size_t len, Sink & sink);
void writeString(const unsigned char * buf, size_t len, Sink & sink);

View file

@ -3,6 +3,7 @@
#include "affinity.hh"
#include "sync.hh"
#include "finally.hh"
#include "serialise.hh"
#include <cctype>
#include <cerrno>
@ -568,19 +569,25 @@ void writeFull(int fd, const string & s, bool allowInterrupts)
string drainFD(int fd)
{
string result;
unsigned char buffer[4096];
StringSink sink;
drainFD(fd, sink);
return std::move(*sink.s);
}
void drainFD(int fd, Sink & sink)
{
std::vector<unsigned char> buf(4096);
while (1) {
checkInterrupt();
ssize_t rd = read(fd, buffer, sizeof buffer);
ssize_t rd = read(fd, buf.data(), buf.size());
if (rd == -1) {
if (errno != EINTR)
throw SysError("reading from file");
}
else if (rd == 0) break;
else result.append((char *) buffer, rd);
else sink(buf.data(), rd);
}
return result;
}
@ -920,20 +927,47 @@ string runProgram(Path program, bool searchPath, const Strings & args,
return res.second;
}
std::pair<int, std::string> runProgram(const RunOptions & options)
std::pair<int, std::string> runProgram(const RunOptions & options_)
{
RunOptions options(options_);
StringSink sink;
options.stdout = &sink;
int status = 0;
try {
runProgram2(options);
} catch (ExecError & e) {
status = e.status;
}
return {status, std::move(*sink.s)};
}
void runProgram2(const RunOptions & options)
{
checkInterrupt();
assert(!(options.stdin && options.input));
std::unique_ptr<Source> source_;
Source * source = options.stdin;
if (options.input) {
source_ = std::make_unique<StringSource>(*options.input);
source = source_.get();
}
/* Create a pipe. */
Pipe out, in;
out.create();
if (options.input) in.create();
if (options.stdout) out.create();
if (source) in.create();
/* Fork. */
Pid pid = startProcess([&]() {
if (dup2(out.writeSide.get(), STDOUT_FILENO) == -1)
if (options.stdout && dup2(out.writeSide.get(), STDOUT_FILENO) == -1)
throw SysError("dupping stdout");
if (options.input && dup2(in.readSide.get(), STDIN_FILENO) == -1)
if (source && dup2(in.readSide.get(), STDIN_FILENO) == -1)
throw SysError("dupping stdin");
Strings args_(options.args);
@ -961,11 +995,20 @@ std::pair<int, std::string> runProgram(const RunOptions & options)
});
if (options.input) {
if (source) {
in.readSide = -1;
writerThread = std::thread([&]() {
try {
writeFull(in.writeSide.get(), *options.input);
std::vector<unsigned char> buf(8 * 1024);
while (true) {
size_t n;
try {
n = source->read(buf.data(), buf.size());
} catch (EndOfFile &) {
break;
}
writeFull(in.writeSide.get(), buf.data(), n);
}
promise.set_value();
} catch (...) {
promise.set_exception(std::current_exception());
@ -974,15 +1017,17 @@ std::pair<int, std::string> runProgram(const RunOptions & options)
});
}
string result = drainFD(out.readSide.get());
if (options.stdout)
drainFD(out.readSide.get(), *options.stdout);
/* Wait for the child to finish. */
int status = pid.wait();
/* Wait for the writer thread to finish. */
if (options.input) promise.get_future().get();
if (source) promise.get_future().get();
return {status, result};
if (status)
throw ExecError(status, fmt("program '%1%' %2%", options.program, statusToString(status)));
}

View file

@ -25,6 +25,9 @@
namespace nix {
struct Sink;
struct Source;
/* Return an environment variable. */
string getEnv(const string & key, const string & def = "");
@ -150,6 +153,7 @@ MakeError(EndOfFile, Error)
/* Read a file descriptor until EOF occurs. */
string drainFD(int fd);
void drainFD(int fd, Sink & sink);
/* Automatic cleanup of resources. */
@ -256,6 +260,8 @@ struct RunOptions
bool searchPath = true;
Strings args;
std::experimental::optional<std::string> input;
Source * stdin = nullptr;
Sink * stdout = nullptr;
bool _killStderr = false;
RunOptions(const Path & program, const Strings & args)
@ -266,6 +272,8 @@ struct RunOptions
std::pair<int, std::string> runProgram(const RunOptions & options);
void runProgram2(const RunOptions & options);
class ExecError : public Error
{