From 75989bdca773eedb8b8d1cc8a7675900358acd25 Mon Sep 17 00:00:00 2001 From: Eelco Dolstra Date: Fri, 16 Sep 2016 18:54:14 +0200 Subject: [PATCH] Make computeFSClosure() single-threaded again The fact that queryPathInfo() is synchronous meant that we needed a thread for every concurrent binary cache lookup, even though they end up being handled by the same download thread. Requiring hundreds of threads is not a good idea. So now there is an asynchronous version of queryPathInfo() that takes a callback function to process the result. Similarly, enqueueDownload() now takes a callback rather than returning a future. Thus, a command like nix path-info --store https://cache.nixos.org/ -r /nix/store/slljrzwmpygy1daay14kjszsr9xix063-nixos-16.09beta231.dccf8c5 that returns 4941 paths now takes 1.87s using only 2 threads (the main thread and the downloader thread). (This is with a prewarmed CloudFront.) --- src/libstore/binary-cache-store.cc | 32 +++++- src/libstore/binary-cache-store.hh | 10 +- src/libstore/download.cc | 38 +++++-- src/libstore/download.hh | 10 +- src/libstore/http-binary-cache-store.cc | 25 +++-- src/libstore/local-binary-cache-store.cc | 24 ++-- src/libstore/local-store.cc | 59 +++++----- src/libstore/local-store.hh | 4 +- src/libstore/misc.cc | 136 +++++++++++++---------- src/libstore/remote-store.cc | 62 ++++++----- src/libstore/remote-store.hh | 4 +- src/libstore/s3-binary-cache-store.cc | 76 +++++++------ src/libstore/store-api.cc | 98 ++++++++++------ src/libstore/store-api.hh | 9 +- src/libutil/util.cc | 11 ++ src/libutil/util.hh | 39 +++++++ 16 files changed, 410 insertions(+), 227 deletions(-) diff --git a/src/libstore/binary-cache-store.cc b/src/libstore/binary-cache-store.cc index e71ea6a57..0ffbd6e55 100644 --- a/src/libstore/binary-cache-store.cc +++ b/src/libstore/binary-cache-store.cc @@ -12,6 +12,8 @@ #include +#include + namespace nix { BinaryCacheStore::BinaryCacheStore(const Params & params) @@ -58,6 +60,19 @@ void BinaryCacheStore::notImpl() throw Error("operation not implemented for binary cache stores"); } +std::shared_ptr BinaryCacheStore::getFile(const std::string & path) +{ + std::promise> promise; + getFile(path, + [&](std::shared_ptr result) { + promise.set_value(result); + }, + [&](std::exception_ptr exc) { + promise.set_exception(exc); + }); + return promise.get_future().get(); +} + Path BinaryCacheStore::narInfoFileFor(const Path & storePath) { assertStorePath(storePath); @@ -176,17 +191,22 @@ void BinaryCacheStore::narFromPath(const Path & storePath, Sink & sink) sink((unsigned char *) nar->c_str(), nar->size()); } -std::shared_ptr BinaryCacheStore::queryPathInfoUncached(const Path & storePath) +void BinaryCacheStore::queryPathInfoUncached(const Path & storePath, + std::function)> success, + std::function failure) { auto narInfoFile = narInfoFileFor(storePath); - auto data = getFile(narInfoFile); - if (!data) return 0; - auto narInfo = make_ref(*this, *data, narInfoFile); + getFile(narInfoFile, + [=](std::shared_ptr data) { + if (!data) return success(0); - stats.narInfoRead++; + stats.narInfoRead++; - return std::shared_ptr(narInfo); + callSuccess(success, failure, (std::shared_ptr) + std::make_shared(*this, *data, narInfoFile)); + }, + failure); } Path BinaryCacheStore::addToStore(const string & name, const Path & srcPath, diff --git a/src/libstore/binary-cache-store.hh b/src/libstore/binary-cache-store.hh index 2d10179f3..41671b7d9 100644 --- a/src/libstore/binary-cache-store.hh +++ b/src/libstore/binary-cache-store.hh @@ -31,7 +31,11 @@ protected: /* Return the contents of the specified file, or null if it doesn't exist. */ - virtual std::shared_ptr getFile(const std::string & path) = 0; + virtual void getFile(const std::string & path, + std::function)> success, + std::function failure) = 0; + + std::shared_ptr getFile(const std::string & path); bool wantMassQuery_ = false; int priority = 50; @@ -56,7 +60,9 @@ public: PathSet queryAllValidPaths() override { notImpl(); } - std::shared_ptr queryPathInfoUncached(const Path & path) override; + void queryPathInfoUncached(const Path & path, + std::function)> success, + std::function failure) override; void queryReferrers(const Path & path, PathSet & referrers) override diff --git a/src/libstore/download.cc b/src/libstore/download.cc index b2d223da9..ca324595a 100644 --- a/src/libstore/download.cc +++ b/src/libstore/download.cc @@ -47,8 +47,9 @@ struct CurlDownloader : public Downloader CurlDownloader & downloader; DownloadRequest request; DownloadResult result; - bool done = false; // whether the promise has been set - std::promise promise; + bool done = false; // whether either the success or failure function has been called + std::function success; + std::function failure; CURL * req = 0; bool active = false; // whether the handle has been added to the multi object std::string status; @@ -86,7 +87,7 @@ struct CurlDownloader : public Downloader if (requestHeaders) curl_slist_free_all(requestHeaders); try { if (!done) - fail(DownloadError(Transient, format("download of ‘%s’ was interrupted") % request.uri)); + fail(DownloadError(Interrupted, format("download of ‘%s’ was interrupted") % request.uri)); } catch (...) { ignoreException(); } @@ -95,8 +96,9 @@ struct CurlDownloader : public Downloader template void fail(const T & e) { - promise.set_exception(std::make_exception_ptr(e)); + assert(!done); done = true; + failure(std::make_exception_ptr(e)); } size_t writeCallback(void * contents, size_t size, size_t nmemb) @@ -239,7 +241,7 @@ struct CurlDownloader : public Downloader (httpStatus == 200 || httpStatus == 304 || httpStatus == 226 /* FTP */ || httpStatus == 0 /* other protocol */)) { result.cached = httpStatus == 304; - promise.set_value(result); + success(result); done = true; } else { Error err = @@ -253,9 +255,11 @@ struct CurlDownloader : public Downloader attempt++; auto exc = - httpStatus != 0 - ? DownloadError(err, format("unable to download ‘%s’: HTTP error %d") % request.uri % httpStatus) - : DownloadError(err, format("unable to download ‘%s’: %s (%d)") % request.uri % curl_easy_strerror(code) % code); + code == CURLE_ABORTED_BY_CALLBACK && _isInterrupted + ? DownloadError(Interrupted, format("download of ‘%s’ was interrupted") % request.uri) + : httpStatus != 0 + ? DownloadError(err, format("unable to download ‘%s’: HTTP error %d") % request.uri % httpStatus) + : DownloadError(err, format("unable to download ‘%s’: %s (%d)") % request.uri % curl_easy_strerror(code) % code); /* If this is a transient error, then maybe retry the download after a while. */ @@ -414,7 +418,7 @@ struct CurlDownloader : public Downloader { try { workerThreadMain(); - } catch (Interrupted & e) { + } catch (nix::Interrupted & e) { } catch (std::exception & e) { printMsg(lvlError, format("unexpected error in download thread: %s") % e.what()); } @@ -437,11 +441,14 @@ struct CurlDownloader : public Downloader writeFull(wakeupPipe.writeSide.get(), " "); } - std::future enqueueDownload(const DownloadRequest & request) override + void enqueueDownload(const DownloadRequest & request, + std::function success, + std::function failure) override { auto item = std::make_shared(*this, request); + item->success = success; + item->failure = failure; enqueueItem(item); - return item->promise.get_future(); } }; @@ -458,6 +465,15 @@ ref makeDownloader() return make_ref(); } +std::future Downloader::enqueueDownload(const DownloadRequest & request) +{ + auto promise = std::make_shared>(); + enqueueDownload(request, + [promise](const DownloadResult & result) { promise->set_value(result); }, + [promise](std::exception_ptr exc) { promise->set_exception(exc); }); + return promise->get_future(); +} + DownloadResult Downloader::download(const DownloadRequest & request) { return enqueueDownload(request).get(); diff --git a/src/libstore/download.hh b/src/libstore/download.hh index 6b90ff202..82b5d641f 100644 --- a/src/libstore/download.hh +++ b/src/libstore/download.hh @@ -23,8 +23,6 @@ struct DownloadRequest struct DownloadResult { - enum Status { Success, NotFound, Forbidden, Misc, Transient }; - Status status; bool cached; std::string etag; std::string effectiveUrl; @@ -38,7 +36,11 @@ struct Downloader /* Enqueue a download request, returning a future to the result of the download. The future may throw a DownloadError exception. */ - virtual std::future enqueueDownload(const DownloadRequest & request) = 0; + virtual void enqueueDownload(const DownloadRequest & request, + std::function success, + std::function failure) = 0; + + std::future enqueueDownload(const DownloadRequest & request); /* Synchronously download a file. */ DownloadResult download(const DownloadRequest & request); @@ -50,7 +52,7 @@ struct Downloader Path downloadCached(ref store, const string & uri, bool unpack, string name = "", const Hash & expectedHash = Hash(), string * effectiveUri = nullptr); - enum Error { NotFound, Forbidden, Misc, Transient }; + enum Error { NotFound, Forbidden, Misc, Transient, Interrupted }; }; /* Return a shared Downloader object. Using this object is preferred diff --git a/src/libstore/http-binary-cache-store.cc b/src/libstore/http-binary-cache-store.cc index 91ee6fcb6..60728de04 100644 --- a/src/libstore/http-binary-cache-store.cc +++ b/src/libstore/http-binary-cache-store.cc @@ -69,18 +69,27 @@ protected: throw UploadToHTTP("uploading to an HTTP binary cache is not supported"); } - std::shared_ptr getFile(const std::string & path) override + void getFile(const std::string & path, + std::function)> success, + std::function failure) { DownloadRequest request(cacheUri + "/" + path); request.showProgress = DownloadRequest::no; request.tries = 8; - try { - return getDownloader()->download(request).data; - } catch (DownloadError & e) { - if (e.error == Downloader::NotFound || e.error == Downloader::Forbidden) - return 0; - throw; - } + + getDownloader()->enqueueDownload(request, + [success](const DownloadResult & result) { + success(result.data); + }, + [success, failure](std::exception_ptr exc) { + try { + std::rethrow_exception(exc); + } catch (DownloadError & e) { + if (e.error == Downloader::NotFound || e.error == Downloader::Forbidden) + success(0); + failure(exc); + } + }); } }; diff --git a/src/libstore/local-binary-cache-store.cc b/src/libstore/local-binary-cache-store.cc index 91d2650fe..0f377989b 100644 --- a/src/libstore/local-binary-cache-store.cc +++ b/src/libstore/local-binary-cache-store.cc @@ -32,7 +32,19 @@ protected: void upsertFile(const std::string & path, const std::string & data) override; - std::shared_ptr getFile(const std::string & path) override; + void getFile(const std::string & path, + std::function)> success, + std::function failure) override + { + sync2async>(success, failure, [&]() { + try { + return std::make_shared(readFile(binaryCacheDir + "/" + path)); + } catch (SysError & e) { + if (e.errNo == ENOENT) return std::shared_ptr(); + throw; + } + }); + } PathSet queryAllValidPaths() override { @@ -76,16 +88,6 @@ void LocalBinaryCacheStore::upsertFile(const std::string & path, const std::stri atomicWrite(binaryCacheDir + "/" + path, data); } -std::shared_ptr LocalBinaryCacheStore::getFile(const std::string & path) -{ - try { - return std::make_shared(readFile(binaryCacheDir + "/" + path)); - } catch (SysError & e) { - if (e.errNo == ENOENT) return 0; - throw; - } -} - static RegisterStoreImplementation regStore([]( const std::string & uri, const Store::Params & params) -> std::shared_ptr diff --git a/src/libstore/local-store.cc b/src/libstore/local-store.cc index 10056f2f1..466cea727 100644 --- a/src/libstore/local-store.cc +++ b/src/libstore/local-store.cc @@ -577,49 +577,54 @@ Hash parseHashField(const Path & path, const string & s) } -std::shared_ptr LocalStore::queryPathInfoUncached(const Path & path) +void LocalStore::queryPathInfoUncached(const Path & path, + std::function)> success, + std::function failure) { - auto info = std::make_shared(); - info->path = path; + sync2async>(success, failure, [&]() { - assertStorePath(path); + auto info = std::make_shared(); + info->path = path; - return retrySQLite>([&]() { - auto state(_state.lock()); + assertStorePath(path); - /* Get the path info. */ - auto useQueryPathInfo(state->stmtQueryPathInfo.use()(path)); + return retrySQLite>([&]() { + auto state(_state.lock()); - if (!useQueryPathInfo.next()) - return std::shared_ptr(); + /* Get the path info. */ + auto useQueryPathInfo(state->stmtQueryPathInfo.use()(path)); - info->id = useQueryPathInfo.getInt(0); + if (!useQueryPathInfo.next()) + return std::shared_ptr(); - info->narHash = parseHashField(path, useQueryPathInfo.getStr(1)); + info->id = useQueryPathInfo.getInt(0); - info->registrationTime = useQueryPathInfo.getInt(2); + info->narHash = parseHashField(path, useQueryPathInfo.getStr(1)); - auto s = (const char *) sqlite3_column_text(state->stmtQueryPathInfo, 3); - if (s) info->deriver = s; + info->registrationTime = useQueryPathInfo.getInt(2); - /* Note that narSize = NULL yields 0. */ - info->narSize = useQueryPathInfo.getInt(4); + auto s = (const char *) sqlite3_column_text(state->stmtQueryPathInfo, 3); + if (s) info->deriver = s; - info->ultimate = useQueryPathInfo.getInt(5) == 1; + /* Note that narSize = NULL yields 0. */ + info->narSize = useQueryPathInfo.getInt(4); - s = (const char *) sqlite3_column_text(state->stmtQueryPathInfo, 6); - if (s) info->sigs = tokenizeString(s, " "); + info->ultimate = useQueryPathInfo.getInt(5) == 1; - s = (const char *) sqlite3_column_text(state->stmtQueryPathInfo, 7); - if (s) info->ca = s; + s = (const char *) sqlite3_column_text(state->stmtQueryPathInfo, 6); + if (s) info->sigs = tokenizeString(s, " "); - /* Get the references. */ - auto useQueryReferences(state->stmtQueryReferences.use()(info->id)); + s = (const char *) sqlite3_column_text(state->stmtQueryPathInfo, 7); + if (s) info->ca = s; - while (useQueryReferences.next()) - info->references.insert(useQueryReferences.getStr(0)); + /* Get the references. */ + auto useQueryReferences(state->stmtQueryReferences.use()(info->id)); - return info; + while (useQueryReferences.next()) + info->references.insert(useQueryReferences.getStr(0)); + + return info; + }); }); } diff --git a/src/libstore/local-store.hh b/src/libstore/local-store.hh index 5b5960cf2..24188130d 100644 --- a/src/libstore/local-store.hh +++ b/src/libstore/local-store.hh @@ -106,7 +106,9 @@ public: PathSet queryAllValidPaths() override; - std::shared_ptr queryPathInfoUncached(const Path & path) override; + void queryPathInfoUncached(const Path & path, + std::function)> success, + std::function failure) override; void queryReferrers(const Path & path, PathSet & referrers) override; diff --git a/src/libstore/misc.cc b/src/libstore/misc.cc index da654ba0d..0c2c49e55 100644 --- a/src/libstore/misc.cc +++ b/src/libstore/misc.cc @@ -8,66 +8,90 @@ namespace nix { -void Store::computeFSClosure(const Path & path, - PathSet & paths, bool flipDirection, bool includeOutputs, bool includeDerivers) +void Store::computeFSClosure(const Path & startPath, + PathSet & paths_, bool flipDirection, bool includeOutputs, bool includeDerivers) { - ThreadPool pool; - - Sync state_; - - std::function doPath; - - doPath = [&](const Path & path) { - { - auto state(state_.lock()); - if (paths.count(path)) return; - paths.insert(path); - } - - auto info = queryPathInfo(path); - - if (flipDirection) { - - PathSet referrers; - queryReferrers(path, referrers); - for (auto & ref : referrers) - if (ref != path) - pool.enqueue(std::bind(doPath, ref)); - - if (includeOutputs) { - PathSet derivers = queryValidDerivers(path); - for (auto & i : derivers) - pool.enqueue(std::bind(doPath, i)); - } - - if (includeDerivers && isDerivation(path)) { - PathSet outputs = queryDerivationOutputs(path); - for (auto & i : outputs) - if (isValidPath(i) && queryPathInfo(i)->deriver == path) - pool.enqueue(std::bind(doPath, i)); - } - - } else { - - for (auto & ref : info->references) - if (ref != path) - pool.enqueue(std::bind(doPath, ref)); - - if (includeOutputs && isDerivation(path)) { - PathSet outputs = queryDerivationOutputs(path); - for (auto & i : outputs) - if (isValidPath(i)) pool.enqueue(std::bind(doPath, i)); - } - - if (includeDerivers && isValidPath(info->deriver)) - pool.enqueue(std::bind(doPath, info->deriver)); - - } + struct State + { + size_t pending; + PathSet & paths; + std::exception_ptr exc; }; - pool.enqueue(std::bind(doPath, path)); + Sync state_(State{0, paths_, 0}); - pool.process(); + std::function enqueue; + + std::condition_variable done; + + enqueue = [&](const Path & path) -> void { + { + auto state(state_.lock()); + if (state->exc) return; + if (state->paths.count(path)) return; + state->paths.insert(path); + state->pending++; + } + + queryPathInfo(path, + [&, path](ref info) { + // FIXME: calls to isValidPath() should be async + + if (flipDirection) { + + PathSet referrers; + queryReferrers(path, referrers); + for (auto & ref : referrers) + if (ref != path) + enqueue(ref); + + if (includeOutputs) + for (auto & i : queryValidDerivers(path)) + enqueue(i); + + if (includeDerivers && isDerivation(path)) + for (auto & i : queryDerivationOutputs(path)) + if (isValidPath(i) && queryPathInfo(i)->deriver == path) + enqueue(i); + + } else { + + for (auto & ref : info->references) + if (ref != path) + enqueue(ref); + + if (includeOutputs && isDerivation(path)) + for (auto & i : queryDerivationOutputs(path)) + if (isValidPath(i)) enqueue(i); + + if (includeDerivers && isValidPath(info->deriver)) + enqueue(info->deriver); + + } + + { + auto state(state_.lock()); + assert(state->pending); + if (!--state->pending) done.notify_one(); + } + + }, + + [&, path](std::exception_ptr exc) { + auto state(state_.lock()); + if (!state->exc) state->exc = exc; + assert(state->pending); + if (!--state->pending) done.notify_one(); + }); + }; + + enqueue(startPath); + + { + auto state(state_.lock()); + while (state->pending) state.wait(done); + if (state->exc) std::rethrow_exception(state->exc); + } } diff --git a/src/libstore/remote-store.cc b/src/libstore/remote-store.cc index 94075f3b9..7b73557a5 100644 --- a/src/libstore/remote-store.cc +++ b/src/libstore/remote-store.cc @@ -246,36 +246,40 @@ void RemoteStore::querySubstitutablePathInfos(const PathSet & paths, } -std::shared_ptr RemoteStore::queryPathInfoUncached(const Path & path) +void RemoteStore::queryPathInfoUncached(const Path & path, + std::function)> success, + std::function failure) { - auto conn(connections->get()); - conn->to << wopQueryPathInfo << path; - try { - conn->processStderr(); - } catch (Error & e) { - // Ugly backwards compatibility hack. - if (e.msg().find("is not valid") != std::string::npos) - throw InvalidPath(e.what()); - throw; - } - if (GET_PROTOCOL_MINOR(conn->daemonVersion) >= 17) { - bool valid = readInt(conn->from) != 0; - if (!valid) throw InvalidPath(format("path ‘%s’ is not valid") % path); - } - auto info = std::make_shared(); - info->path = path; - info->deriver = readString(conn->from); - if (info->deriver != "") assertStorePath(info->deriver); - info->narHash = parseHash(htSHA256, readString(conn->from)); - info->references = readStorePaths(*this, conn->from); - info->registrationTime = readInt(conn->from); - info->narSize = readLongLong(conn->from); - if (GET_PROTOCOL_MINOR(conn->daemonVersion) >= 16) { - info->ultimate = readInt(conn->from) != 0; - info->sigs = readStrings(conn->from); - info->ca = readString(conn->from); - } - return info; + sync2async>(success, failure, [&]() { + auto conn(connections->get()); + conn->to << wopQueryPathInfo << path; + try { + conn->processStderr(); + } catch (Error & e) { + // Ugly backwards compatibility hack. + if (e.msg().find("is not valid") != std::string::npos) + throw InvalidPath(e.what()); + throw; + } + if (GET_PROTOCOL_MINOR(conn->daemonVersion) >= 17) { + bool valid = readInt(conn->from) != 0; + if (!valid) throw InvalidPath(format("path ‘%s’ is not valid") % path); + } + auto info = std::make_shared(); + info->path = path; + info->deriver = readString(conn->from); + if (info->deriver != "") assertStorePath(info->deriver); + info->narHash = parseHash(htSHA256, readString(conn->from)); + info->references = readStorePaths(*this, conn->from); + info->registrationTime = readInt(conn->from); + info->narSize = readLongLong(conn->from); + if (GET_PROTOCOL_MINOR(conn->daemonVersion) >= 16) { + info->ultimate = readInt(conn->from) != 0; + info->sigs = readStrings(conn->from); + info->ca = readString(conn->from); + } + return info; + }); } diff --git a/src/libstore/remote-store.hh b/src/libstore/remote-store.hh index e756805ea..9879337d6 100644 --- a/src/libstore/remote-store.hh +++ b/src/libstore/remote-store.hh @@ -34,7 +34,9 @@ public: PathSet queryAllValidPaths() override; - std::shared_ptr queryPathInfoUncached(const Path & path) override; + void queryPathInfoUncached(const Path & path, + std::function)> success, + std::function failure) override; void queryReferrers(const Path & path, PathSet & referrers) override; diff --git a/src/libstore/s3-binary-cache-store.cc b/src/libstore/s3-binary-cache-store.cc index ed95620bb..0722e43d5 100644 --- a/src/libstore/s3-binary-cache-store.cc +++ b/src/libstore/s3-binary-cache-store.cc @@ -167,46 +167,50 @@ struct S3BinaryCacheStoreImpl : public S3BinaryCacheStore stats.putTimeMs += duration; } - std::shared_ptr getFile(const std::string & path) override + void getFile(const std::string & path, + std::function)> success, + std::function failure) override { - debug(format("fetching ‘s3://%1%/%2%’...") % bucketName % path); + sync2async>(success, failure, [&]() { + debug(format("fetching ‘s3://%1%/%2%’...") % bucketName % path); - auto request = - Aws::S3::Model::GetObjectRequest() - .WithBucket(bucketName) - .WithKey(path); + auto request = + Aws::S3::Model::GetObjectRequest() + .WithBucket(bucketName) + .WithKey(path); - request.SetResponseStreamFactory([&]() { - return Aws::New("STRINGSTREAM"); + request.SetResponseStreamFactory([&]() { + return Aws::New("STRINGSTREAM"); + }); + + stats.get++; + + try { + + auto now1 = std::chrono::steady_clock::now(); + + auto result = checkAws(format("AWS error fetching ‘%s’") % path, + client->GetObject(request)); + + auto now2 = std::chrono::steady_clock::now(); + + auto res = dynamic_cast(result.GetBody()).str(); + + auto duration = std::chrono::duration_cast(now2 - now1).count(); + + printMsg(lvlTalkative, format("downloaded ‘s3://%1%/%2%’ (%3% bytes) in %4% ms") + % bucketName % path % res.size() % duration); + + stats.getBytes += res.size(); + stats.getTimeMs += duration; + + return std::make_shared(res); + + } catch (S3Error & e) { + if (e.err == Aws::S3::S3Errors::NO_SUCH_KEY) return std::shared_ptr(); + throw; + } }); - - stats.get++; - - try { - - auto now1 = std::chrono::steady_clock::now(); - - auto result = checkAws(format("AWS error fetching ‘%s’") % path, - client->GetObject(request)); - - auto now2 = std::chrono::steady_clock::now(); - - auto res = dynamic_cast(result.GetBody()).str(); - - auto duration = std::chrono::duration_cast(now2 - now1).count(); - - printMsg(lvlTalkative, format("downloaded ‘s3://%1%/%2%’ (%3% bytes) in %4% ms") - % bucketName % path % res.size() % duration); - - stats.getBytes += res.size(); - stats.getTimeMs += duration; - - return std::make_shared(res); - - } catch (S3Error & e) { - if (e.err == Aws::S3::S3Errors::NO_SUCH_KEY) return 0; - throw; - } } PathSet queryAllValidPaths() override diff --git a/src/libstore/store-api.cc b/src/libstore/store-api.cc index 5dd56f905..427170843 100644 --- a/src/libstore/store-api.cc +++ b/src/libstore/store-api.cc @@ -4,6 +4,8 @@ #include "util.hh" #include "nar-info-disk-cache.hh" +#include + namespace nix { @@ -282,52 +284,80 @@ bool Store::isValidPath(const Path & storePath) ref Store::queryPathInfo(const Path & storePath) +{ + std::promise> promise; + + queryPathInfo(storePath, + [&](ref info) { + promise.set_value(info); + }, + [&](std::exception_ptr exc) { + promise.set_exception(exc); + }); + + return promise.get_future().get(); +} + + +void Store::queryPathInfo(const Path & storePath, + std::function)> success, + std::function failure) { auto hashPart = storePathToHash(storePath); - { - auto state_(state.lock()); - auto res = state_->pathInfoCache.get(hashPart); - if (res) { - stats.narInfoReadAverted++; - if (!*res) - throw InvalidPath(format("path ‘%s’ is not valid") % storePath); - return ref(*res); + try { + + { + auto res = state.lock()->pathInfoCache.get(hashPart); + if (res) { + stats.narInfoReadAverted++; + if (!*res) + throw InvalidPath(format("path ‘%s’ is not valid") % storePath); + return success(ref(*res)); + } } - } - if (diskCache) { - auto res = diskCache->lookupNarInfo(getUri(), hashPart); - if (res.first != NarInfoDiskCache::oUnknown) { - stats.narInfoReadAverted++; - auto state_(state.lock()); - state_->pathInfoCache.upsert(hashPart, - res.first == NarInfoDiskCache::oInvalid ? 0 : res.second); - if (res.first == NarInfoDiskCache::oInvalid || - (res.second->path != storePath && storePathToName(storePath) != "")) - throw InvalidPath(format("path ‘%s’ is not valid") % storePath); - return ref(res.second); + if (diskCache) { + auto res = diskCache->lookupNarInfo(getUri(), hashPart); + if (res.first != NarInfoDiskCache::oUnknown) { + stats.narInfoReadAverted++; + { + auto state_(state.lock()); + state_->pathInfoCache.upsert(hashPart, + res.first == NarInfoDiskCache::oInvalid ? 0 : res.second); + if (res.first == NarInfoDiskCache::oInvalid || + (res.second->path != storePath && storePathToName(storePath) != "")) + throw InvalidPath(format("path ‘%s’ is not valid") % storePath); + } + return success(ref(res.second)); + } } + + } catch (std::exception & e) { + return callFailure(failure); } - auto info = queryPathInfoUncached(storePath); + queryPathInfoUncached(storePath, + [this, storePath, hashPart, success, failure](std::shared_ptr info) { - if (diskCache) - diskCache->upsertNarInfo(getUri(), hashPart, info); + if (diskCache) + diskCache->upsertNarInfo(getUri(), hashPart, info); - { - auto state_(state.lock()); - state_->pathInfoCache.upsert(hashPart, info); - } + { + auto state_(state.lock()); + state_->pathInfoCache.upsert(hashPart, info); + } - if (!info - || (info->path != storePath && storePathToName(storePath) != "")) - { - stats.narInfoMissing++; - throw InvalidPath(format("path ‘%s’ is not valid") % storePath); - } + if (!info + || (info->path != storePath && storePathToName(storePath) != "")) + { + stats.narInfoMissing++; + return failure(std::make_exception_ptr(InvalidPath(format("path ‘%s’ is not valid") % storePath))); + } - return ref(info); + callSuccess(success, failure, ref(info)); + + }, failure); } diff --git a/src/libstore/store-api.hh b/src/libstore/store-api.hh index 41fc58fc4..cba4deaad 100644 --- a/src/libstore/store-api.hh +++ b/src/libstore/store-api.hh @@ -319,9 +319,16 @@ public: the name part of the store path. */ ref queryPathInfo(const Path & path); + /* Asynchronous version of queryPathInfo(). */ + void queryPathInfo(const Path & path, + std::function)> success, + std::function failure); + protected: - virtual std::shared_ptr queryPathInfoUncached(const Path & path) = 0; + virtual void queryPathInfoUncached(const Path & path, + std::function)> success, + std::function failure) = 0; public: diff --git a/src/libutil/util.cc b/src/libutil/util.cc index 8e029fb48..1750e0373 100644 --- a/src/libutil/util.cc +++ b/src/libutil/util.cc @@ -1215,4 +1215,15 @@ string base64Decode(const string & s) } +void callFailure(const std::function & failure) +{ + try { + failure(std::current_exception()); + } catch (std::exception & e) { + printMsg(lvlError, format("uncaught exception: %s") % e.what()); + abort(); + } +} + + } diff --git a/src/libutil/util.hh b/src/libutil/util.hh index 9bf548326..182a38fb3 100644 --- a/src/libutil/util.hh +++ b/src/libutil/util.hh @@ -376,4 +376,43 @@ string get(const T & map, const string & key, const string & def = "") } +/* Call ‘failure’ with the current exception as argument. If ‘failure’ + throws an exception, abort the program. */ +void callFailure(const std::function & failure); + + +/* Evaluate the function ‘f’. If it returns a value, call ‘success’ + with that value as its argument. If it or ‘success’ throws an + exception, call ‘failure’. If ‘failure’ throws an exception, abort + the program. */ +template +void sync2async( + const std::function & success, + const std::function & failure, + const std::function & f) +{ + try { + success(f()); + } catch (...) { + callFailure(failure); + } +} + + +/* Call the function ‘success’. If it throws an exception, call + ‘failure’. If that throws an exception, abort the program. */ +template +void callSuccess( + const std::function & success, + const std::function & failure, + T && arg) +{ + try { + success(arg); + } catch (...) { + callFailure(failure); + } +} + + }