Make computeFSClosure() single-threaded again

The fact that queryPathInfo() is synchronous meant that we needed a
thread for every concurrent binary cache lookup, even though they end
up being handled by the same download thread. Requiring hundreds of
threads is not a good idea. So now there is an asynchronous version of
queryPathInfo() that takes a callback function to process the
result. Similarly, enqueueDownload() now takes a callback rather than
returning a future.

Thus, a command like

  nix path-info --store https://cache.nixos.org/ -r /nix/store/slljrzwmpygy1daay14kjszsr9xix063-nixos-16.09beta231.dccf8c5

that returns 4941 paths now takes 1.87s using only 2 threads (the main
thread and the downloader thread). (This is with a prewarmed
CloudFront.)
This commit is contained in:
Eelco Dolstra 2016-09-16 18:54:14 +02:00
parent 054be50257
commit 75989bdca7
16 changed files with 410 additions and 227 deletions

View file

@ -12,6 +12,8 @@
#include <chrono>
#include <future>
namespace nix {
BinaryCacheStore::BinaryCacheStore(const Params & params)
@ -58,6 +60,19 @@ void BinaryCacheStore::notImpl()
throw Error("operation not implemented for binary cache stores");
}
std::shared_ptr<std::string> BinaryCacheStore::getFile(const std::string & path)
{
std::promise<std::shared_ptr<std::string>> promise;
getFile(path,
[&](std::shared_ptr<std::string> result) {
promise.set_value(result);
},
[&](std::exception_ptr exc) {
promise.set_exception(exc);
});
return promise.get_future().get();
}
Path BinaryCacheStore::narInfoFileFor(const Path & storePath)
{
assertStorePath(storePath);
@ -176,17 +191,22 @@ void BinaryCacheStore::narFromPath(const Path & storePath, Sink & sink)
sink((unsigned char *) nar->c_str(), nar->size());
}
std::shared_ptr<ValidPathInfo> BinaryCacheStore::queryPathInfoUncached(const Path & storePath)
void BinaryCacheStore::queryPathInfoUncached(const Path & storePath,
std::function<void(std::shared_ptr<ValidPathInfo>)> success,
std::function<void(std::exception_ptr exc)> failure)
{
auto narInfoFile = narInfoFileFor(storePath);
auto data = getFile(narInfoFile);
if (!data) return 0;
auto narInfo = make_ref<NarInfo>(*this, *data, narInfoFile);
getFile(narInfoFile,
[=](std::shared_ptr<std::string> data) {
if (!data) return success(0);
stats.narInfoRead++;
return std::shared_ptr<NarInfo>(narInfo);
callSuccess(success, failure, (std::shared_ptr<ValidPathInfo>)
std::make_shared<NarInfo>(*this, *data, narInfoFile));
},
failure);
}
Path BinaryCacheStore::addToStore(const string & name, const Path & srcPath,

View file

@ -31,7 +31,11 @@ protected:
/* Return the contents of the specified file, or null if it
doesn't exist. */
virtual std::shared_ptr<std::string> getFile(const std::string & path) = 0;
virtual void getFile(const std::string & path,
std::function<void(std::shared_ptr<std::string>)> success,
std::function<void(std::exception_ptr exc)> failure) = 0;
std::shared_ptr<std::string> getFile(const std::string & path);
bool wantMassQuery_ = false;
int priority = 50;
@ -56,7 +60,9 @@ public:
PathSet queryAllValidPaths() override
{ notImpl(); }
std::shared_ptr<ValidPathInfo> queryPathInfoUncached(const Path & path) override;
void queryPathInfoUncached(const Path & path,
std::function<void(std::shared_ptr<ValidPathInfo>)> success,
std::function<void(std::exception_ptr exc)> failure) override;
void queryReferrers(const Path & path,
PathSet & referrers) override

View file

@ -47,8 +47,9 @@ struct CurlDownloader : public Downloader
CurlDownloader & downloader;
DownloadRequest request;
DownloadResult result;
bool done = false; // whether the promise has been set
std::promise<DownloadResult> promise;
bool done = false; // whether either the success or failure function has been called
std::function<void(const DownloadResult &)> success;
std::function<void(std::exception_ptr exc)> failure;
CURL * req = 0;
bool active = false; // whether the handle has been added to the multi object
std::string status;
@ -86,7 +87,7 @@ struct CurlDownloader : public Downloader
if (requestHeaders) curl_slist_free_all(requestHeaders);
try {
if (!done)
fail(DownloadError(Transient, format("download of %s was interrupted") % request.uri));
fail(DownloadError(Interrupted, format("download of %s was interrupted") % request.uri));
} catch (...) {
ignoreException();
}
@ -95,8 +96,9 @@ struct CurlDownloader : public Downloader
template<class T>
void fail(const T & e)
{
promise.set_exception(std::make_exception_ptr(e));
assert(!done);
done = true;
failure(std::make_exception_ptr(e));
}
size_t writeCallback(void * contents, size_t size, size_t nmemb)
@ -239,7 +241,7 @@ struct CurlDownloader : public Downloader
(httpStatus == 200 || httpStatus == 304 || httpStatus == 226 /* FTP */ || httpStatus == 0 /* other protocol */))
{
result.cached = httpStatus == 304;
promise.set_value(result);
success(result);
done = true;
} else {
Error err =
@ -253,7 +255,9 @@ struct CurlDownloader : public Downloader
attempt++;
auto exc =
httpStatus != 0
code == CURLE_ABORTED_BY_CALLBACK && _isInterrupted
? DownloadError(Interrupted, format("download of %s was interrupted") % request.uri)
: httpStatus != 0
? DownloadError(err, format("unable to download %s: HTTP error %d") % request.uri % httpStatus)
: DownloadError(err, format("unable to download %s: %s (%d)") % request.uri % curl_easy_strerror(code) % code);
@ -414,7 +418,7 @@ struct CurlDownloader : public Downloader
{
try {
workerThreadMain();
} catch (Interrupted & e) {
} catch (nix::Interrupted & e) {
} catch (std::exception & e) {
printMsg(lvlError, format("unexpected error in download thread: %s") % e.what());
}
@ -437,11 +441,14 @@ struct CurlDownloader : public Downloader
writeFull(wakeupPipe.writeSide.get(), " ");
}
std::future<DownloadResult> enqueueDownload(const DownloadRequest & request) override
void enqueueDownload(const DownloadRequest & request,
std::function<void(const DownloadResult &)> success,
std::function<void(std::exception_ptr exc)> failure) override
{
auto item = std::make_shared<DownloadItem>(*this, request);
item->success = success;
item->failure = failure;
enqueueItem(item);
return item->promise.get_future();
}
};
@ -458,6 +465,15 @@ ref<Downloader> makeDownloader()
return make_ref<CurlDownloader>();
}
std::future<DownloadResult> Downloader::enqueueDownload(const DownloadRequest & request)
{
auto promise = std::make_shared<std::promise<DownloadResult>>();
enqueueDownload(request,
[promise](const DownloadResult & result) { promise->set_value(result); },
[promise](std::exception_ptr exc) { promise->set_exception(exc); });
return promise->get_future();
}
DownloadResult Downloader::download(const DownloadRequest & request)
{
return enqueueDownload(request).get();

View file

@ -23,8 +23,6 @@ struct DownloadRequest
struct DownloadResult
{
enum Status { Success, NotFound, Forbidden, Misc, Transient };
Status status;
bool cached;
std::string etag;
std::string effectiveUrl;
@ -38,7 +36,11 @@ struct Downloader
/* Enqueue a download request, returning a future to the result of
the download. The future may throw a DownloadError
exception. */
virtual std::future<DownloadResult> enqueueDownload(const DownloadRequest & request) = 0;
virtual void enqueueDownload(const DownloadRequest & request,
std::function<void(const DownloadResult &)> success,
std::function<void(std::exception_ptr exc)> failure) = 0;
std::future<DownloadResult> enqueueDownload(const DownloadRequest & request);
/* Synchronously download a file. */
DownloadResult download(const DownloadRequest & request);
@ -50,7 +52,7 @@ struct Downloader
Path downloadCached(ref<Store> store, const string & uri, bool unpack, string name = "",
const Hash & expectedHash = Hash(), string * effectiveUri = nullptr);
enum Error { NotFound, Forbidden, Misc, Transient };
enum Error { NotFound, Forbidden, Misc, Transient, Interrupted };
};
/* Return a shared Downloader object. Using this object is preferred

View file

@ -69,18 +69,27 @@ protected:
throw UploadToHTTP("uploading to an HTTP binary cache is not supported");
}
std::shared_ptr<std::string> getFile(const std::string & path) override
void getFile(const std::string & path,
std::function<void(std::shared_ptr<std::string>)> success,
std::function<void(std::exception_ptr exc)> failure)
{
DownloadRequest request(cacheUri + "/" + path);
request.showProgress = DownloadRequest::no;
request.tries = 8;
getDownloader()->enqueueDownload(request,
[success](const DownloadResult & result) {
success(result.data);
},
[success, failure](std::exception_ptr exc) {
try {
return getDownloader()->download(request).data;
std::rethrow_exception(exc);
} catch (DownloadError & e) {
if (e.error == Downloader::NotFound || e.error == Downloader::Forbidden)
return 0;
throw;
success(0);
failure(exc);
}
});
}
};

View file

@ -32,7 +32,19 @@ protected:
void upsertFile(const std::string & path, const std::string & data) override;
std::shared_ptr<std::string> getFile(const std::string & path) override;
void getFile(const std::string & path,
std::function<void(std::shared_ptr<std::string>)> success,
std::function<void(std::exception_ptr exc)> failure) override
{
sync2async<std::shared_ptr<std::string>>(success, failure, [&]() {
try {
return std::make_shared<std::string>(readFile(binaryCacheDir + "/" + path));
} catch (SysError & e) {
if (e.errNo == ENOENT) return std::shared_ptr<std::string>();
throw;
}
});
}
PathSet queryAllValidPaths() override
{
@ -76,16 +88,6 @@ void LocalBinaryCacheStore::upsertFile(const std::string & path, const std::stri
atomicWrite(binaryCacheDir + "/" + path, data);
}
std::shared_ptr<std::string> LocalBinaryCacheStore::getFile(const std::string & path)
{
try {
return std::make_shared<std::string>(readFile(binaryCacheDir + "/" + path));
} catch (SysError & e) {
if (e.errNo == ENOENT) return 0;
throw;
}
}
static RegisterStoreImplementation regStore([](
const std::string & uri, const Store::Params & params)
-> std::shared_ptr<Store>

View file

@ -577,8 +577,12 @@ Hash parseHashField(const Path & path, const string & s)
}
std::shared_ptr<ValidPathInfo> LocalStore::queryPathInfoUncached(const Path & path)
void LocalStore::queryPathInfoUncached(const Path & path,
std::function<void(std::shared_ptr<ValidPathInfo>)> success,
std::function<void(std::exception_ptr exc)> failure)
{
sync2async<std::shared_ptr<ValidPathInfo>>(success, failure, [&]() {
auto info = std::make_shared<ValidPathInfo>();
info->path = path;
@ -621,6 +625,7 @@ std::shared_ptr<ValidPathInfo> LocalStore::queryPathInfoUncached(const Path & pa
return info;
});
});
}

View file

@ -106,7 +106,9 @@ public:
PathSet queryAllValidPaths() override;
std::shared_ptr<ValidPathInfo> queryPathInfoUncached(const Path & path) override;
void queryPathInfoUncached(const Path & path,
std::function<void(std::shared_ptr<ValidPathInfo>)> success,
std::function<void(std::exception_ptr exc)> failure) override;
void queryReferrers(const Path & path, PathSet & referrers) override;

View file

@ -8,23 +8,34 @@
namespace nix {
void Store::computeFSClosure(const Path & path,
PathSet & paths, bool flipDirection, bool includeOutputs, bool includeDerivers)
void Store::computeFSClosure(const Path & startPath,
PathSet & paths_, bool flipDirection, bool includeOutputs, bool includeDerivers)
{
ThreadPool pool;
struct State
{
size_t pending;
PathSet & paths;
std::exception_ptr exc;
};
Sync<bool> state_;
Sync<State> state_(State{0, paths_, 0});
std::function<void(Path)> doPath;
std::function<void(const Path &)> enqueue;
doPath = [&](const Path & path) {
std::condition_variable done;
enqueue = [&](const Path & path) -> void {
{
auto state(state_.lock());
if (paths.count(path)) return;
paths.insert(path);
if (state->exc) return;
if (state->paths.count(path)) return;
state->paths.insert(path);
state->pending++;
}
auto info = queryPathInfo(path);
queryPathInfo(path,
[&, path](ref<ValidPathInfo> info) {
// FIXME: calls to isValidPath() should be async
if (flipDirection) {
@ -32,42 +43,55 @@ void Store::computeFSClosure(const Path & path,
queryReferrers(path, referrers);
for (auto & ref : referrers)
if (ref != path)
pool.enqueue(std::bind(doPath, ref));
enqueue(ref);
if (includeOutputs) {
PathSet derivers = queryValidDerivers(path);
for (auto & i : derivers)
pool.enqueue(std::bind(doPath, i));
}
if (includeOutputs)
for (auto & i : queryValidDerivers(path))
enqueue(i);
if (includeDerivers && isDerivation(path)) {
PathSet outputs = queryDerivationOutputs(path);
for (auto & i : outputs)
if (includeDerivers && isDerivation(path))
for (auto & i : queryDerivationOutputs(path))
if (isValidPath(i) && queryPathInfo(i)->deriver == path)
pool.enqueue(std::bind(doPath, i));
}
enqueue(i);
} else {
for (auto & ref : info->references)
if (ref != path)
pool.enqueue(std::bind(doPath, ref));
enqueue(ref);
if (includeOutputs && isDerivation(path)) {
PathSet outputs = queryDerivationOutputs(path);
for (auto & i : outputs)
if (isValidPath(i)) pool.enqueue(std::bind(doPath, i));
}
if (includeOutputs && isDerivation(path))
for (auto & i : queryDerivationOutputs(path))
if (isValidPath(i)) enqueue(i);
if (includeDerivers && isValidPath(info->deriver))
pool.enqueue(std::bind(doPath, info->deriver));
enqueue(info->deriver);
}
{
auto state(state_.lock());
assert(state->pending);
if (!--state->pending) done.notify_one();
}
},
[&, path](std::exception_ptr exc) {
auto state(state_.lock());
if (!state->exc) state->exc = exc;
assert(state->pending);
if (!--state->pending) done.notify_one();
});
};
pool.enqueue(std::bind(doPath, path));
enqueue(startPath);
pool.process();
{
auto state(state_.lock());
while (state->pending) state.wait(done);
if (state->exc) std::rethrow_exception(state->exc);
}
}

View file

@ -246,8 +246,11 @@ void RemoteStore::querySubstitutablePathInfos(const PathSet & paths,
}
std::shared_ptr<ValidPathInfo> RemoteStore::queryPathInfoUncached(const Path & path)
void RemoteStore::queryPathInfoUncached(const Path & path,
std::function<void(std::shared_ptr<ValidPathInfo>)> success,
std::function<void(std::exception_ptr exc)> failure)
{
sync2async<std::shared_ptr<ValidPathInfo>>(success, failure, [&]() {
auto conn(connections->get());
conn->to << wopQueryPathInfo << path;
try {
@ -276,6 +279,7 @@ std::shared_ptr<ValidPathInfo> RemoteStore::queryPathInfoUncached(const Path & p
info->ca = readString(conn->from);
}
return info;
});
}

View file

@ -34,7 +34,9 @@ public:
PathSet queryAllValidPaths() override;
std::shared_ptr<ValidPathInfo> queryPathInfoUncached(const Path & path) override;
void queryPathInfoUncached(const Path & path,
std::function<void(std::shared_ptr<ValidPathInfo>)> success,
std::function<void(std::exception_ptr exc)> failure) override;
void queryReferrers(const Path & path, PathSet & referrers) override;

View file

@ -167,8 +167,11 @@ struct S3BinaryCacheStoreImpl : public S3BinaryCacheStore
stats.putTimeMs += duration;
}
std::shared_ptr<std::string> getFile(const std::string & path) override
void getFile(const std::string & path,
std::function<void(std::shared_ptr<std::string>)> success,
std::function<void(std::exception_ptr exc)> failure) override
{
sync2async<std::shared_ptr<std::string>>(success, failure, [&]() {
debug(format("fetching s3://%1%/%2%...") % bucketName % path);
auto request =
@ -204,9 +207,10 @@ struct S3BinaryCacheStoreImpl : public S3BinaryCacheStore
return std::make_shared<std::string>(res);
} catch (S3Error & e) {
if (e.err == Aws::S3::S3Errors::NO_SUCH_KEY) return 0;
if (e.err == Aws::S3::S3Errors::NO_SUCH_KEY) return std::shared_ptr<std::string>();
throw;
}
});
}
PathSet queryAllValidPaths() override

View file

@ -4,6 +4,8 @@
#include "util.hh"
#include "nar-info-disk-cache.hh"
#include <future>
namespace nix {
@ -282,17 +284,36 @@ bool Store::isValidPath(const Path & storePath)
ref<const ValidPathInfo> Store::queryPathInfo(const Path & storePath)
{
std::promise<ref<ValidPathInfo>> promise;
queryPathInfo(storePath,
[&](ref<ValidPathInfo> info) {
promise.set_value(info);
},
[&](std::exception_ptr exc) {
promise.set_exception(exc);
});
return promise.get_future().get();
}
void Store::queryPathInfo(const Path & storePath,
std::function<void(ref<ValidPathInfo>)> success,
std::function<void(std::exception_ptr exc)> failure)
{
auto hashPart = storePathToHash(storePath);
try {
{
auto state_(state.lock());
auto res = state_->pathInfoCache.get(hashPart);
auto res = state.lock()->pathInfoCache.get(hashPart);
if (res) {
stats.narInfoReadAverted++;
if (!*res)
throw InvalidPath(format("path %s is not valid") % storePath);
return ref<ValidPathInfo>(*res);
return success(ref<ValidPathInfo>(*res));
}
}
@ -300,17 +321,24 @@ ref<const ValidPathInfo> Store::queryPathInfo(const Path & storePath)
auto res = diskCache->lookupNarInfo(getUri(), hashPart);
if (res.first != NarInfoDiskCache::oUnknown) {
stats.narInfoReadAverted++;
{
auto state_(state.lock());
state_->pathInfoCache.upsert(hashPart,
res.first == NarInfoDiskCache::oInvalid ? 0 : res.second);
if (res.first == NarInfoDiskCache::oInvalid ||
(res.second->path != storePath && storePathToName(storePath) != ""))
throw InvalidPath(format("path %s is not valid") % storePath);
return ref<ValidPathInfo>(res.second);
}
return success(ref<ValidPathInfo>(res.second));
}
}
auto info = queryPathInfoUncached(storePath);
} catch (std::exception & e) {
return callFailure(failure);
}
queryPathInfoUncached(storePath,
[this, storePath, hashPart, success, failure](std::shared_ptr<ValidPathInfo> info) {
if (diskCache)
diskCache->upsertNarInfo(getUri(), hashPart, info);
@ -324,10 +352,12 @@ ref<const ValidPathInfo> Store::queryPathInfo(const Path & storePath)
|| (info->path != storePath && storePathToName(storePath) != ""))
{
stats.narInfoMissing++;
throw InvalidPath(format("path %s is not valid") % storePath);
return failure(std::make_exception_ptr(InvalidPath(format("path %s is not valid") % storePath)));
}
return ref<ValidPathInfo>(info);
callSuccess(success, failure, ref<ValidPathInfo>(info));
}, failure);
}

View file

@ -319,9 +319,16 @@ public:
the name part of the store path. */
ref<const ValidPathInfo> queryPathInfo(const Path & path);
/* Asynchronous version of queryPathInfo(). */
void queryPathInfo(const Path & path,
std::function<void(ref<ValidPathInfo>)> success,
std::function<void(std::exception_ptr exc)> failure);
protected:
virtual std::shared_ptr<ValidPathInfo> queryPathInfoUncached(const Path & path) = 0;
virtual void queryPathInfoUncached(const Path & path,
std::function<void(std::shared_ptr<ValidPathInfo>)> success,
std::function<void(std::exception_ptr exc)> failure) = 0;
public:

View file

@ -1215,4 +1215,15 @@ string base64Decode(const string & s)
}
void callFailure(const std::function<void(std::exception_ptr exc)> & failure)
{
try {
failure(std::current_exception());
} catch (std::exception & e) {
printMsg(lvlError, format("uncaught exception: %s") % e.what());
abort();
}
}
}

View file

@ -376,4 +376,43 @@ string get(const T & map, const string & key, const string & def = "")
}
/* Call failure with the current exception as argument. If failure
throws an exception, abort the program. */
void callFailure(const std::function<void(std::exception_ptr exc)> & failure);
/* Evaluate the function f. If it returns a value, call success
with that value as its argument. If it or success throws an
exception, call failure. If failure throws an exception, abort
the program. */
template<class T>
void sync2async(
const std::function<void(T)> & success,
const std::function<void(std::exception_ptr exc)> & failure,
const std::function<T()> & f)
{
try {
success(f());
} catch (...) {
callFailure(failure);
}
}
/* Call the function success. If it throws an exception, call
failure. If that throws an exception, abort the program. */
template<class T>
void callSuccess(
const std::function<void(T)> & success,
const std::function<void(std::exception_ptr exc)> & failure,
T && arg)
{
try {
success(arg);
} catch (...) {
callFailure(failure);
}
}
}