Merge pull request #5472 from NixOS/async-realisation-substitution

async realisation substitution
This commit is contained in:
Eelco Dolstra 2021-11-16 12:54:20 +01:00 committed by GitHub
commit 6463eaca14
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
14 changed files with 204 additions and 65 deletions

View file

@ -988,8 +988,9 @@ static void prim_derivationStrict(EvalState & state, const Pos & pos, Value * *
} }
if (i->name == state.sContentAddressed) { if (i->name == state.sContentAddressed) {
settings.requireExperimentalFeature(Xp::CaDerivations);
contentAddressed = state.forceBool(*i->value, pos); contentAddressed = state.forceBool(*i->value, pos);
if (contentAddressed)
settings.requireExperimentalFeature(Xp::CaDerivations);
} }
/* The `args' attribute is special: it supplies the /* The `args' attribute is special: it supplies the

View file

@ -439,40 +439,29 @@ StorePath BinaryCacheStore::addTextToStore(const string & name, const string & s
})->path; })->path;
} }
std::optional<const Realisation> BinaryCacheStore::queryRealisation(const DrvOutput & id) void BinaryCacheStore::queryRealisationUncached(const DrvOutput & id,
Callback<std::shared_ptr<const Realisation>> callback) noexcept
{ {
if (diskCache) {
auto [cacheOutcome, maybeCachedRealisation] =
diskCache->lookupRealisation(getUri(), id);
switch (cacheOutcome) {
case NarInfoDiskCache::oValid:
debug("Returning a cached realisation for %s", id.to_string());
return *maybeCachedRealisation;
case NarInfoDiskCache::oInvalid:
debug("Returning a cached missing realisation for %s", id.to_string());
return {};
case NarInfoDiskCache::oUnknown:
break;
}
}
auto outputInfoFilePath = realisationsPrefix + "/" + id.to_string() + ".doi"; auto outputInfoFilePath = realisationsPrefix + "/" + id.to_string() + ".doi";
auto rawOutputInfo = getFile(outputInfoFilePath);
if (rawOutputInfo) { auto callbackPtr = std::make_shared<decltype(callback)>(std::move(callback));
auto realisation = Realisation::fromJSON(
nlohmann::json::parse(*rawOutputInfo), outputInfoFilePath);
if (diskCache) Callback<std::shared_ptr<std::string>> newCallback = {
diskCache->upsertRealisation( [=](std::future<std::shared_ptr<std::string>> fut) {
getUri(), realisation); try {
auto data = fut.get();
if (!data) return (*callbackPtr)(nullptr);
return {realisation}; auto realisation = Realisation::fromJSON(
} else { nlohmann::json::parse(*data), outputInfoFilePath);
if (diskCache) return (*callbackPtr)(std::make_shared<const Realisation>(realisation));
diskCache->upsertAbsentRealisation(getUri(), id); } catch (...) {
return std::nullopt; callbackPtr->rethrow();
} }
}
};
getFile(outputInfoFilePath, std::move(newCallback));
} }
void BinaryCacheStore::registerDrvOutput(const Realisation& info) { void BinaryCacheStore::registerDrvOutput(const Realisation& info) {

View file

@ -108,7 +108,8 @@ public:
void registerDrvOutput(const Realisation & info) override; void registerDrvOutput(const Realisation & info) override;
std::optional<const Realisation> queryRealisation(const DrvOutput &) override; void queryRealisationUncached(const DrvOutput &,
Callback<std::shared_ptr<const Realisation>> callback) noexcept override;
void narFromPath(const StorePath & path, Sink & sink) override; void narFromPath(const StorePath & path, Sink & sink) override;

View file

@ -1,6 +1,8 @@
#include "drv-output-substitution-goal.hh" #include "drv-output-substitution-goal.hh"
#include "finally.hh"
#include "worker.hh" #include "worker.hh"
#include "substitution-goal.hh" #include "substitution-goal.hh"
#include "callback.hh"
namespace nix { namespace nix {
@ -50,14 +52,42 @@ void DrvOutputSubstitutionGoal::tryNext()
return; return;
} }
auto sub = subs.front(); sub = subs.front();
subs.pop_front(); subs.pop_front();
// FIXME: Make async // FIXME: Make async
outputInfo = sub->queryRealisation(id); // outputInfo = sub->queryRealisation(id);
outPipe.create();
promise = decltype(promise)();
sub->queryRealisation(
id, { [&](std::future<std::shared_ptr<const Realisation>> res) {
try {
Finally updateStats([this]() { outPipe.writeSide.close(); });
promise.set_value(res.get());
} catch (...) {
promise.set_exception(std::current_exception());
}
} });
worker.childStarted(shared_from_this(), {outPipe.readSide.get()}, true, false);
state = &DrvOutputSubstitutionGoal::realisationFetched;
}
void DrvOutputSubstitutionGoal::realisationFetched()
{
worker.childTerminated(this);
try {
outputInfo = promise.get_future().get();
} catch (std::exception & e) {
printError(e.what());
substituterFailed = true;
}
if (!outputInfo) { if (!outputInfo) {
tryNext(); return tryNext();
return;
} }
for (const auto & [depId, depPath] : outputInfo->dependentRealisations) { for (const auto & [depId, depPath] : outputInfo->dependentRealisations) {
@ -119,4 +149,10 @@ void DrvOutputSubstitutionGoal::work()
(this->*state)(); (this->*state)();
} }
void DrvOutputSubstitutionGoal::handleEOF(int fd)
{
if (fd == outPipe.readSide.get()) worker.wakeUp(shared_from_this());
}
} }

View file

@ -3,6 +3,8 @@
#include "store-api.hh" #include "store-api.hh"
#include "goal.hh" #include "goal.hh"
#include "realisation.hh" #include "realisation.hh"
#include <thread>
#include <future>
namespace nix { namespace nix {
@ -20,11 +22,18 @@ private:
// The realisation corresponding to the given output id. // The realisation corresponding to the given output id.
// Will be filled once we can get it. // Will be filled once we can get it.
std::optional<Realisation> outputInfo; std::shared_ptr<const Realisation> outputInfo;
/* The remaining substituters. */ /* The remaining substituters. */
std::list<ref<Store>> subs; std::list<ref<Store>> subs;
/* The current substituter. */
std::shared_ptr<Store> sub;
Pipe outPipe;
std::thread thr;
std::promise<std::shared_ptr<const Realisation>> promise;
/* Whether a substituter failed. */ /* Whether a substituter failed. */
bool substituterFailed = false; bool substituterFailed = false;
@ -36,6 +45,7 @@ public:
void init(); void init();
void tryNext(); void tryNext();
void realisationFetched();
void outPathValid(); void outPathValid();
void finished(); void finished();
@ -44,7 +54,7 @@ public:
string key() override; string key() override;
void work() override; void work() override;
void handleEOF(int fd) override;
}; };
} }

View file

@ -1226,13 +1226,14 @@ struct RestrictedStore : public virtual RestrictedStoreConfig, public virtual Lo
// corresponds to an allowed derivation // corresponds to an allowed derivation
{ throw Error("registerDrvOutput"); } { throw Error("registerDrvOutput"); }
std::optional<const Realisation> queryRealisation(const DrvOutput & id) override void queryRealisationUncached(const DrvOutput & id,
Callback<std::shared_ptr<const Realisation>> callback) noexcept override
// XXX: This should probably be allowed if the realisation corresponds to // XXX: This should probably be allowed if the realisation corresponds to
// an allowed derivation // an allowed derivation
{ {
if (!goal.isAllowed(id)) if (!goal.isAllowed(id))
throw InvalidPath("cannot query an unknown output id '%s' in recursive Nix", id.to_string()); callback(nullptr);
return next->queryRealisation(id); next->queryRealisation(id, std::move(callback));
} }
void buildPaths(const std::vector<DerivedPath> & paths, BuildMode buildMode, std::shared_ptr<Store> evalStore) override void buildPaths(const std::vector<DerivedPath> & paths, BuildMode buildMode, std::shared_ptr<Store> evalStore) override

View file

@ -50,8 +50,9 @@ struct DummyStore : public virtual DummyStoreConfig, public virtual Store
void narFromPath(const StorePath & path, Sink & sink) override void narFromPath(const StorePath & path, Sink & sink) override
{ unsupported("narFromPath"); } { unsupported("narFromPath"); }
std::optional<const Realisation> queryRealisation(const DrvOutput&) override void queryRealisationUncached(const DrvOutput &,
{ unsupported("queryRealisation"); } Callback<std::shared_ptr<const Realisation>> callback) noexcept override
{ callback(nullptr); }
}; };
static RegisterStoreImplementation<DummyStore, DummyStoreConfig> regDummyStore; static RegisterStoreImplementation<DummyStore, DummyStoreConfig> regDummyStore;

View file

@ -367,7 +367,8 @@ public:
return conn->remoteVersion; return conn->remoteVersion;
} }
std::optional<const Realisation> queryRealisation(const DrvOutput&) override void queryRealisationUncached(const DrvOutput &,
Callback<std::shared_ptr<const Realisation>> callback) noexcept override
// TODO: Implement // TODO: Implement
{ unsupported("queryRealisation"); } { unsupported("queryRealisation"); }
}; };

View file

@ -1836,13 +1836,24 @@ std::optional<const Realisation> LocalStore::queryRealisation_(
return { res }; return { res };
} }
std::optional<const Realisation> void LocalStore::queryRealisationUncached(const DrvOutput & id,
LocalStore::queryRealisation(const DrvOutput & id) Callback<std::shared_ptr<const Realisation>> callback) noexcept
{ {
return retrySQLite<std::optional<const Realisation>>([&]() { try {
auto state(_state.lock()); auto maybeRealisation
return queryRealisation_(*state, id); = retrySQLite<std::optional<const Realisation>>([&]() {
}); auto state(_state.lock());
return queryRealisation_(*state, id);
});
if (maybeRealisation)
callback(
std::make_shared<const Realisation>(maybeRealisation.value()));
else
callback(nullptr);
} catch (...) {
callback.rethrow();
}
} }
FixedOutputHash LocalStore::hashCAPath( FixedOutputHash LocalStore::hashCAPath(

View file

@ -207,7 +207,8 @@ public:
std::optional<const Realisation> queryRealisation_(State & state, const DrvOutput & id); std::optional<const Realisation> queryRealisation_(State & state, const DrvOutput & id);
std::optional<std::pair<int64_t, Realisation>> queryRealisationCore_(State & state, const DrvOutput & id); std::optional<std::pair<int64_t, Realisation>> queryRealisationCore_(State & state, const DrvOutput & id);
std::optional<const Realisation> queryRealisation(const DrvOutput&) override; void queryRealisationUncached(const DrvOutput&,
Callback<std::shared_ptr<const Realisation>> callback) noexcept override;
private: private:

View file

@ -680,23 +680,33 @@ void RemoteStore::registerDrvOutput(const Realisation & info)
conn.processStderr(); conn.processStderr();
} }
std::optional<const Realisation> RemoteStore::queryRealisation(const DrvOutput & id) void RemoteStore::queryRealisationUncached(const DrvOutput & id,
Callback<std::shared_ptr<const Realisation>> callback) noexcept
{ {
auto conn(getConnection()); auto conn(getConnection());
conn->to << wopQueryRealisation; conn->to << wopQueryRealisation;
conn->to << id.to_string(); conn->to << id.to_string();
conn.processStderr(); conn.processStderr();
if (GET_PROTOCOL_MINOR(conn->daemonVersion) < 31) {
auto outPaths = worker_proto::read(*this, conn->from, Phantom<std::set<StorePath>>{}); auto real = [&]() -> std::shared_ptr<const Realisation> {
if (outPaths.empty()) if (GET_PROTOCOL_MINOR(conn->daemonVersion) < 31) {
return std::nullopt; auto outPaths = worker_proto::read(
return {Realisation{.id = id, .outPath = *outPaths.begin()}}; *this, conn->from, Phantom<std::set<StorePath>> {});
} else { if (outPaths.empty())
auto realisations = worker_proto::read(*this, conn->from, Phantom<std::set<Realisation>>{}); return nullptr;
if (realisations.empty()) return std::make_shared<const Realisation>(Realisation { .id = id, .outPath = *outPaths.begin() });
return std::nullopt; } else {
return *realisations.begin(); auto realisations = worker_proto::read(
} *this, conn->from, Phantom<std::set<Realisation>> {});
if (realisations.empty())
return nullptr;
return std::make_shared<const Realisation>(*realisations.begin());
}
}();
try {
callback(std::shared_ptr<const Realisation>(real));
} catch (...) { return callback.rethrow(); }
} }
static void writeDerivedPaths(RemoteStore & store, ConnectionHandle & conn, const std::vector<DerivedPath> & reqs) static void writeDerivedPaths(RemoteStore & store, ConnectionHandle & conn, const std::vector<DerivedPath> & reqs)

View file

@ -88,7 +88,8 @@ public:
void registerDrvOutput(const Realisation & info) override; void registerDrvOutput(const Realisation & info) override;
std::optional<const Realisation> queryRealisation(const DrvOutput &) override; void queryRealisationUncached(const DrvOutput &,
Callback<std::shared_ptr<const Realisation>> callback) noexcept override;
void buildPaths(const std::vector<DerivedPath> & paths, BuildMode buildMode, std::shared_ptr<Store> evalStore) override; void buildPaths(const std::vector<DerivedPath> & paths, BuildMode buildMode, std::shared_ptr<Store> evalStore) override;

View file

@ -542,6 +542,74 @@ void Store::queryPathInfo(const StorePath & storePath,
}}); }});
} }
void Store::queryRealisation(const DrvOutput & id,
Callback<std::shared_ptr<const Realisation>> callback) noexcept
{
try {
if (diskCache) {
auto [cacheOutcome, maybeCachedRealisation]
= diskCache->lookupRealisation(getUri(), id);
switch (cacheOutcome) {
case NarInfoDiskCache::oValid:
debug("Returning a cached realisation for %s", id.to_string());
callback(maybeCachedRealisation);
return;
case NarInfoDiskCache::oInvalid:
debug(
"Returning a cached missing realisation for %s",
id.to_string());
callback(nullptr);
return;
case NarInfoDiskCache::oUnknown:
break;
}
}
} catch (...) {
return callback.rethrow();
}
auto callbackPtr
= std::make_shared<decltype(callback)>(std::move(callback));
queryRealisationUncached(
id,
{ [this, id, callbackPtr](
std::future<std::shared_ptr<const Realisation>> fut) {
try {
auto info = fut.get();
if (diskCache) {
if (info)
diskCache->upsertRealisation(getUri(), *info);
else
diskCache->upsertAbsentRealisation(getUri(), id);
}
(*callbackPtr)(std::shared_ptr<const Realisation>(info));
} catch (...) {
callbackPtr->rethrow();
}
} });
}
std::shared_ptr<const Realisation> Store::queryRealisation(const DrvOutput & id)
{
using RealPtr = std::shared_ptr<const Realisation>;
std::promise<RealPtr> promise;
queryRealisation(id,
{[&](std::future<RealPtr> result) {
try {
promise.set_value(result.get());
} catch (...) {
promise.set_exception(std::current_exception());
}
}});
return promise.get_future().get();
}
void Store::substitutePaths(const StorePathSet & paths) void Store::substitutePaths(const StorePathSet & paths)
{ {

View file

@ -369,6 +369,14 @@ public:
void queryPathInfo(const StorePath & path, void queryPathInfo(const StorePath & path,
Callback<ref<const ValidPathInfo>> callback) noexcept; Callback<ref<const ValidPathInfo>> callback) noexcept;
/* Query the information about a realisation. */
std::shared_ptr<const Realisation> queryRealisation(const DrvOutput &);
/* Asynchronous version of queryRealisation(). */
void queryRealisation(const DrvOutput &,
Callback<std::shared_ptr<const Realisation>> callback) noexcept;
/* Check whether the given valid path info is sufficiently attested, by /* Check whether the given valid path info is sufficiently attested, by
either being signed by a trusted public key or content-addressed, in either being signed by a trusted public key or content-addressed, in
order to be included in the given store. order to be included in the given store.
@ -393,11 +401,11 @@ protected:
virtual void queryPathInfoUncached(const StorePath & path, virtual void queryPathInfoUncached(const StorePath & path,
Callback<std::shared_ptr<const ValidPathInfo>> callback) noexcept = 0; Callback<std::shared_ptr<const ValidPathInfo>> callback) noexcept = 0;
virtual void queryRealisationUncached(const DrvOutput &,
Callback<std::shared_ptr<const Realisation>> callback) noexcept = 0;
public: public:
virtual std::optional<const Realisation> queryRealisation(const DrvOutput &) = 0;
/* Queries the set of incoming FS references for a store path. /* Queries the set of incoming FS references for a store path.
The result is not cleared. */ The result is not cleared. */
virtual void queryReferrers(const StorePath & path, StorePathSet & referrers) virtual void queryReferrers(const StorePath & path, StorePathSet & referrers)