Make HttpBinaryCacheStore::narFromPath() run in constant memory

This reduces memory consumption of

  nix copy --from https://cache.nixos.org --to ~/my-nix /nix/store/95cwv4q54dc6giaqv6q6p4r02ia2km35-blender-2.79

from 176 MiB to 82 MiB. (The remaining memory is probably due to xz
decompression overhead.)

Issue https://github.com/NixOS/nix/issues/1681.
Issue https://github.com/NixOS/nix/issues/1969.
This commit is contained in:
Eelco Dolstra 2018-03-28 00:01:47 +02:00
parent 08ec757726
commit e87e4a60d6
No known key found for this signature in database
GPG key ID: 8170B4726D7198DE
3 changed files with 116 additions and 3 deletions

View file

@ -7,6 +7,7 @@
#include "s3.hh" #include "s3.hh"
#include "compression.hh" #include "compression.hh"
#include "pathlocks.hh" #include "pathlocks.hh"
#include "finally.hh"
#ifdef ENABLE_S3 #ifdef ENABLE_S3
#include <aws/core/client/ClientConfiguration.h> #include <aws/core/client/ClientConfiguration.h>
@ -137,7 +138,10 @@ struct CurlDownloader : public Downloader
size_t writeCallback(void * contents, size_t size, size_t nmemb) size_t writeCallback(void * contents, size_t size, size_t nmemb)
{ {
size_t realSize = size * nmemb; size_t realSize = size * nmemb;
result.data->append((char *) contents, realSize); if (request.dataCallback)
request.dataCallback((char *) contents, realSize);
else
result.data->append((char *) contents, realSize);
return realSize; return realSize;
} }
@ -635,6 +639,92 @@ DownloadResult Downloader::download(const DownloadRequest & request)
return enqueueDownload(request).get(); return enqueueDownload(request).get();
} }
void Downloader::download(DownloadRequest && request, Sink & sink)
{
/* Note: we can't call 'sink' via request.dataCallback, because
that would cause the sink to execute on the downloader
thread. If 'sink' is a coroutine, this will fail. Also, if the
sink is expensive (e.g. one that does decompression and writing
to the Nix store), it would stall the download thread too much.
Therefore we use a buffer to communicate data between the
download thread and the calling thread. */
struct State {
bool quit = false;
std::exception_ptr exc;
std::string data;
std::condition_variable avail, request;
};
auto _state = std::make_shared<Sync<State>>();
/* In case of an exception, wake up the download thread. FIXME:
abort the download request. */
Finally finally([&]() {
auto state(_state->lock());
state->quit = true;
state->request.notify_one();
});
request.dataCallback = [_state](char * buf, size_t len) {
auto state(_state->lock());
if (state->quit) return;
/* If the buffer is full, then go to sleep until the calling
thread wakes us up (i.e. when it has removed data from the
buffer). Note: this does stall the download thread. */
while (state->data.size() > 1024 * 1024) {
if (state->quit) return;
debug("download buffer is full; going to sleep");
state.wait(state->request);
}
/* Append data to the buffer and wake up the calling
thread. */
state->data.append(buf, len);
state->avail.notify_one();
};
enqueueDownload(request,
{[_state](std::future<DownloadResult> fut) {
auto state(_state->lock());
state->quit = true;
try {
fut.get();
} catch (...) {
state->exc = std::current_exception();
}
state->avail.notify_one();
state->request.notify_one();
}});
auto state(_state->lock());
while (true) {
checkInterrupt();
if (state->quit) {
if (state->exc) std::rethrow_exception(state->exc);
break;
}
/* If no data is available, then wait for the download thread
to wake us up. */
if (state->data.empty())
state.wait(state->avail);
/* If data is available, then flush it to the sink and wake up
the download thread if it's blocked on a full buffer. */
if (!state->data.empty()) {
sink((unsigned char *) state->data.data(), state->data.size());
state->data.clear();
state->request.notify_one();
}
}
}
Path Downloader::downloadCached(ref<Store> store, const string & url_, bool unpack, string name, const Hash & expectedHash, string * effectiveUrl, int ttl) Path Downloader::downloadCached(ref<Store> store, const string & url_, bool unpack, string name, const Hash & expectedHash, string * effectiveUrl, int ttl)
{ {
auto url = resolveUri(url_); auto url = resolveUri(url_);

View file

@ -21,6 +21,7 @@ struct DownloadRequest
bool decompress = true; bool decompress = true;
std::shared_ptr<std::string> data; std::shared_ptr<std::string> data;
std::string mimeType; std::string mimeType;
std::function<void(char *, size_t)> dataCallback;
DownloadRequest(const std::string & uri) DownloadRequest(const std::string & uri)
: uri(uri), parentAct(getCurActivity()) { } : uri(uri), parentAct(getCurActivity()) { }
@ -49,6 +50,10 @@ struct Downloader
/* Synchronously download a file. */ /* Synchronously download a file. */
DownloadResult download(const DownloadRequest & request); DownloadResult download(const DownloadRequest & request);
/* Download a file, writing its data to a sink. The sink will be
invoked on the thread of the caller. */
void download(DownloadRequest && request, Sink & sink);
/* Check if the specified file is already in ~/.cache/nix/tarballs /* Check if the specified file is already in ~/.cache/nix/tarballs
and is more recent than tarball-ttl seconds. Otherwise, and is more recent than tarball-ttl seconds. Otherwise,
use the recorded ETag to verify if the server has a more use the recorded ETag to verify if the server has a more

View file

@ -77,11 +77,29 @@ protected:
} }
} }
void getFile(const std::string & path, DownloadRequest makeRequest(const std::string & path)
Callback<std::shared_ptr<std::string>> callback) override
{ {
DownloadRequest request(cacheUri + "/" + path); DownloadRequest request(cacheUri + "/" + path);
request.tries = 8; request.tries = 8;
return request;
}
void getFile(const std::string & path, Sink & sink) override
{
auto request(makeRequest(path));
try {
getDownloader()->download(std::move(request), sink);
} catch (DownloadError & e) {
if (e.error == Downloader::NotFound || e.error == Downloader::Forbidden)
throw NoSuchBinaryCacheFile("file '%s' does not exist in binary cache '%s'", path, getUri());
throw;
}
}
void getFile(const std::string & path,
Callback<std::shared_ptr<std::string>> callback) override
{
auto request(makeRequest(path));
getDownloader()->enqueueDownload(request, getDownloader()->enqueueDownload(request,
{[callback](std::future<DownloadResult> result) { {[callback](std::future<DownloadResult> result) {