Make LocalBinaryCacheStore::narFromPath() run in constant memory

This reduces memory consumption of

  nix copy --from file://... --to ~/my-nix /nix/store/95cwv4q54dc6giaqv6q6p4r02ia2km35-blender-2.79

from 514 MiB to 18 MiB for an uncompressed binary cache, and from 192
MiB to 53 MiB for a bzipped binary cache. It may also be faster
because fetching can happen concurrently with decompression/writing.

Continuation of 48662d151b.

Issue https://github.com/NixOS/nix/issues/1681.
This commit is contained in:
Eelco Dolstra 2018-03-27 23:12:31 +02:00
parent 81ea8bd5ce
commit 08ec757726
No known key found for this signature in database
GPG key ID: 8170B4726D7198DE
6 changed files with 65 additions and 31 deletions

View file

@ -54,7 +54,15 @@ void BinaryCacheStore::init()
} }
} }
std::shared_ptr<std::string> BinaryCacheStore::getFile(const std::string & path) void BinaryCacheStore::getFile(const std::string & path,
Callback<std::shared_ptr<std::string>> callback)
{
try {
callback(getFile(path));
} catch (...) { callback.rethrow(); }
}
void BinaryCacheStore::getFile(const std::string & path, Sink & sink)
{ {
std::promise<std::shared_ptr<std::string>> promise; std::promise<std::shared_ptr<std::string>> promise;
getFile(path, getFile(path,
@ -65,7 +73,19 @@ std::shared_ptr<std::string> BinaryCacheStore::getFile(const std::string & path)
promise.set_exception(std::current_exception()); promise.set_exception(std::current_exception());
} }
}}); }});
return promise.get_future().get(); auto data = promise.get_future().get();
sink((unsigned char *) data->data(), data->size());
}
std::shared_ptr<std::string> BinaryCacheStore::getFile(const std::string & path)
{
StringSink sink;
try {
getFile(path, sink);
} catch (NoSuchBinaryCacheFile &) {
return nullptr;
}
return sink.s;
} }
Path BinaryCacheStore::narInfoFileFor(const Path & storePath) Path BinaryCacheStore::narInfoFileFor(const Path & storePath)
@ -197,23 +217,21 @@ void BinaryCacheStore::narFromPath(const Path & storePath, Sink & sink)
{ {
auto info = queryPathInfo(storePath).cast<const NarInfo>(); auto info = queryPathInfo(storePath).cast<const NarInfo>();
auto nar = getFile(info->url); auto source = sinkToSource([this, url{info->url}](Sink & sink) {
getFile(url, sink);
if (!nar) throw Error(format("file '%s' missing from binary cache") % info->url); });
stats.narRead++; stats.narRead++;
stats.narReadCompressedBytes += nar->size(); //stats.narReadCompressedBytes += nar->size(); // FIXME
uint64_t narSize = 0; uint64_t narSize = 0;
StringSource source(*nar);
LambdaSink wrapperSink([&](const unsigned char * data, size_t len) { LambdaSink wrapperSink([&](const unsigned char * data, size_t len) {
sink(data, len); sink(data, len);
narSize += len; narSize += len;
}); });
decompress(info->compression, source, wrapperSink); decompress(info->compression, *source, wrapperSink);
stats.narReadBytes += narSize; stats.narReadBytes += narSize;
} }

View file

@ -38,10 +38,16 @@ public:
const std::string & data, const std::string & data,
const std::string & mimeType) = 0; const std::string & mimeType) = 0;
/* Return the contents of the specified file, or null if it /* Note: subclasses must implement at least one of the two
doesn't exist. */ following getFile() methods. */
/* Dump the contents of the specified file to a sink. */
virtual void getFile(const std::string & path, Sink & sink);
/* Fetch the specified file and call the specified callback with
the result. A subclass may implement this asynchronously. */
virtual void getFile(const std::string & path, virtual void getFile(const std::string & path,
Callback<std::shared_ptr<std::string>> callback) = 0; Callback<std::shared_ptr<std::string>> callback);
std::shared_ptr<std::string> getFile(const std::string & path); std::shared_ptr<std::string> getFile(const std::string & path);
@ -129,4 +135,6 @@ public:
}; };
MakeError(NoSuchBinaryCacheFile, Error);
} }

View file

@ -34,15 +34,14 @@ protected:
const std::string & data, const std::string & data,
const std::string & mimeType) override; const std::string & mimeType) override;
void getFile(const std::string & path, void getFile(const std::string & path, Sink & sink) override
Callback<std::shared_ptr<std::string>> callback) override
{ {
try { try {
// FIXME: O(n) space readFile(binaryCacheDir + "/" + path, sink);
callback(std::make_shared<std::string>(readFile(binaryCacheDir + "/" + path)));
} catch (SysError & e) { } catch (SysError & e) {
if (e.errNo == ENOENT) callback(nullptr); else callback.rethrow(); if (e.errNo == ENOENT)
} catch (...) { callback.rethrow(); } throw NoSuchBinaryCacheFile("file '%s' does not exist in binary cache", path);
}
} }
PathSet queryAllValidPaths() override PathSet queryAllValidPaths() override

View file

@ -364,23 +364,23 @@ struct S3BinaryCacheStoreImpl : public S3BinaryCacheStore
uploadFile(path, data, mimeType, ""); uploadFile(path, data, mimeType, "");
} }
void getFile(const std::string & path, void getFile(const std::string & path, Sink & sink) override
Callback<std::shared_ptr<std::string>> callback) override
{ {
try { stats.get++;
stats.get++;
auto res = s3Helper.getObject(bucketName, path); // FIXME: stream output to sink.
auto res = s3Helper.getObject(bucketName, path);
stats.getBytes += res.data ? res.data->size() : 0; stats.getBytes += res.data ? res.data->size() : 0;
stats.getTimeMs += res.durationMs; stats.getTimeMs += res.durationMs;
if (res.data) if (res.data) {
printTalkative("downloaded 's3://%s/%s' (%d bytes) in %d ms", printTalkative("downloaded 's3://%s/%s' (%d bytes) in %d ms",
bucketName, path, res.data->size(), res.durationMs); bucketName, path, res.data->size(), res.durationMs);
callback(std::move(res.data)); sink((unsigned char *) res.data->data(), res.data->size());
} catch (...) { callback.rethrow(); } } else
throw NoSuchBinaryCacheFile("file '%s' does not exist in binary cache '%s'", path, getUri());
} }
PathSet queryAllValidPaths() override PathSet queryAllValidPaths() override

View file

@ -311,6 +311,14 @@ string readFile(const Path & path, bool drain)
} }
void readFile(const Path & path, Sink & sink)
{
AutoCloseFD fd = open(path.c_str(), O_RDONLY | O_CLOEXEC);
if (!fd) throw SysError("opening file '%s'", path);
drainFD(fd.get(), sink);
}
void writeFile(const Path & path, const string & s, mode_t mode) void writeFile(const Path & path, const string & s, mode_t mode)
{ {
AutoCloseFD fd = open(path.c_str(), O_WRONLY | O_TRUNC | O_CREAT | O_CLOEXEC, mode); AutoCloseFD fd = open(path.c_str(), O_WRONLY | O_TRUNC | O_CREAT | O_CLOEXEC, mode);
@ -593,7 +601,7 @@ void drainFD(int fd, Sink & sink, bool block)
throw SysError("making file descriptor non-blocking"); throw SysError("making file descriptor non-blocking");
} }
std::vector<unsigned char> buf(4096); std::vector<unsigned char> buf(64 * 1024);
while (1) { while (1) {
checkInterrupt(); checkInterrupt();
ssize_t rd = read(fd, buf.data(), buf.size()); ssize_t rd = read(fd, buf.data(), buf.size());

View file

@ -98,6 +98,7 @@ unsigned char getFileType(const Path & path);
/* Read the contents of a file into a string. */ /* Read the contents of a file into a string. */
string readFile(int fd); string readFile(int fd);
string readFile(const Path & path, bool drain = false); string readFile(const Path & path, bool drain = false);
void readFile(const Path & path, Sink & sink);
/* Write a string to a file. */ /* Write a string to a file. */
void writeFile(const Path & path, const string & s, mode_t mode = 0666); void writeFile(const Path & path, const string & s, mode_t mode = 0666);