Make the DrvOutputSubstitutionGoal more async

This commit is contained in:
regnat 2021-10-28 16:43:54 +02:00
parent fbc70034b3
commit f4c869977c
2 changed files with 51 additions and 5 deletions

View file

@ -1,6 +1,8 @@
#include "drv-output-substitution-goal.hh" #include "drv-output-substitution-goal.hh"
#include "finally.hh"
#include "worker.hh" #include "worker.hh"
#include "substitution-goal.hh" #include "substitution-goal.hh"
#include "callback.hh"
namespace nix { namespace nix {
@ -50,14 +52,42 @@ void DrvOutputSubstitutionGoal::tryNext()
return; return;
} }
auto sub = subs.front(); sub = subs.front();
subs.pop_front(); subs.pop_front();
// FIXME: Make async // FIXME: Make async
outputInfo = sub->queryRealisation(id); // outputInfo = sub->queryRealisation(id);
outPipe.create();
promise = decltype(promise)();
sub->queryRealisation(
id, { [&](std::future<std::shared_ptr<const Realisation>> res) {
try {
Finally updateStats([this]() { outPipe.writeSide.close(); });
promise.set_value(res.get());
} catch (...) {
promise.set_exception(std::current_exception());
}
} });
worker.childStarted(shared_from_this(), {outPipe.readSide.get()}, true, false);
state = &DrvOutputSubstitutionGoal::realisationFetched;
}
void DrvOutputSubstitutionGoal::realisationFetched()
{
worker.childTerminated(this);
try {
outputInfo = promise.get_future().get();
} catch (std::exception & e) {
printError(e.what());
substituterFailed = true;
}
if (!outputInfo) { if (!outputInfo) {
tryNext(); return tryNext();
return;
} }
for (const auto & [depId, depPath] : outputInfo->dependentRealisations) { for (const auto & [depId, depPath] : outputInfo->dependentRealisations) {
@ -119,4 +149,10 @@ void DrvOutputSubstitutionGoal::work()
(this->*state)(); (this->*state)();
} }
void DrvOutputSubstitutionGoal::handleEOF(int fd)
{
if (fd == outPipe.readSide.get()) worker.wakeUp(shared_from_this());
}
} }

View file

@ -3,6 +3,8 @@
#include "store-api.hh" #include "store-api.hh"
#include "goal.hh" #include "goal.hh"
#include "realisation.hh" #include "realisation.hh"
#include <thread>
#include <future>
namespace nix { namespace nix {
@ -25,6 +27,13 @@ private:
/* The remaining substituters. */ /* The remaining substituters. */
std::list<ref<Store>> subs; std::list<ref<Store>> subs;
/* The current substituter. */
std::shared_ptr<Store> sub;
Pipe outPipe;
std::thread thr;
std::promise<std::shared_ptr<const Realisation>> promise;
/* Whether a substituter failed. */ /* Whether a substituter failed. */
bool substituterFailed = false; bool substituterFailed = false;
@ -36,6 +45,7 @@ public:
void init(); void init();
void tryNext(); void tryNext();
void realisationFetched();
void outPathValid(); void outPathValid();
void finished(); void finished();
@ -44,7 +54,7 @@ public:
string key() override; string key() override;
void work() override; void work() override;
void handleEOF(int fd) override;
}; };
} }