libstore: propagate goal exceptions using promises

drop childException since it's no longer needed. also makes
waitForInput, childFinished, and childTerminated redundant.

Change-Id: I05d88ffd323c5b5c909ac21056162f69ffb0eb9f
This commit is contained in:
eldritch horrors 2024-10-05 00:38:35 +02:00
parent 7ef4466018
commit f389a54079
3 changed files with 27 additions and 57 deletions

View file

@ -57,6 +57,7 @@ try {
while (auto item = co_await collectDeps.next()) { while (auto item = co_await collectDeps.next()) {
auto & [dep, _result] = *item; auto & [dep, _result] = *item;
BOOST_OUTCOME_CO_TRYV(_result);
waiteeDone(dep); waiteeDone(dep);

View file

@ -1,3 +1,4 @@
#include "async-collect.hh"
#include "charptr-cast.hh" #include "charptr-cast.hh"
#include "worker.hh" #include "worker.hh"
#include "finally.hh" #include "finally.hh"
@ -6,6 +7,8 @@
#include "local-derivation-goal.hh" #include "local-derivation-goal.hh"
#include "signals.hh" #include "signals.hh"
#include "hook-instance.hh" // IWYU pragma: keep #include "hook-instance.hh" // IWYU pragma: keep
#include <boost/outcome/try.hpp>
#include <kj/vector.h>
namespace nix { namespace nix {
@ -231,20 +234,9 @@ void Worker::childStarted(GoalPtr goal, kj::Promise<Result<Goal::WorkResult>> pr
if (result.has_value()) { if (result.has_value()) {
goalFinished(goal, result.assume_value()); goalFinished(goal, result.assume_value());
} else { } else {
childException = result.assume_error(); goal->notify->fulfill(result.assume_error());
} }
}) }));
.attach(Finally{[this, goal] {
childTerminated(goal);
}}));
}
void Worker::childTerminated(GoalPtr goal)
{
if (childFinished) {
childFinished->fulfill();
}
} }
@ -282,9 +274,12 @@ std::vector<GoalPtr> Worker::run(std::function<Targets (GoalFactory &)> req)
running = true; running = true;
Finally const _stop([&] { running = false; }); Finally const _stop([&] { running = false; });
std::vector<GoalPtr> results;
topGoals.clear(); topGoals.clear();
for (auto & [goal, _promise] : _topGoals) { for (auto & [goal, _promise] : _topGoals) {
topGoals.insert(goal); topGoals.insert(goal);
results.push_back(goal);
} }
auto onInterrupt = kj::newPromiseAndCrossThreadFulfiller<Result<void>>(); auto onInterrupt = kj::newPromiseAndCrossThreadFulfiller<Result<void>>();
@ -292,8 +287,9 @@ std::vector<GoalPtr> Worker::run(std::function<Targets (GoalFactory &)> req)
return result::failure(std::make_exception_ptr(makeInterrupted())); return result::failure(std::make_exception_ptr(makeInterrupted()));
}); });
auto promise = auto promise = runImpl(std::move(_topGoals))
runImpl().exclusiveJoin(updateStatistics()).exclusiveJoin(std::move(onInterrupt.promise)); .exclusiveJoin(updateStatistics())
.exclusiveJoin(std::move(onInterrupt.promise));
// TODO GC interface? // TODO GC interface?
if (auto localStore = dynamic_cast<LocalStore *>(&store); localStore && settings.minFree != 0) { if (auto localStore = dynamic_cast<LocalStore *>(&store); localStore && settings.minFree != 0) {
@ -303,27 +299,24 @@ std::vector<GoalPtr> Worker::run(std::function<Targets (GoalFactory &)> req)
promise.wait(aio.waitScope).value(); promise.wait(aio.waitScope).value();
std::vector<GoalPtr> results;
for (auto & [i, _p] : _topGoals) {
results.push_back(i);
}
return results; return results;
} }
kj::Promise<Result<void>> Worker::runImpl() kj::Promise<Result<void>> Worker::runImpl(Targets _topGoals)
try { try {
debug("entered goal loop"); debug("entered goal loop");
while (1) { kj::Vector<Targets::value_type> promises(_topGoals.size());
for (auto & gp : _topGoals) {
promises.add(std::move(gp));
}
auto collect = AsyncCollect(promises.releaseAsArray());
while (auto done = co_await collect.next()) {
// propagate goal exceptions outward
BOOST_OUTCOME_CO_TRYV(done->second);
if (topGoals.empty()) break; if (topGoals.empty()) break;
/* Wait for input. */
if (!children.isEmpty())
(co_await waitForInput()).value();
if (childException) {
std::rethrow_exception(childException);
}
} }
/* If --keep-going is not set, it's possible that the main goal /* If --keep-going is not set, it's possible that the main goal
@ -346,18 +339,6 @@ try {
co_return result::failure(std::current_exception()); co_return result::failure(std::current_exception());
} }
kj::Promise<Result<void>> Worker::waitForInput()
try {
printMsg(lvlVomit, "waiting for children");
auto pair = kj::newPromiseAndFulfiller<void>();
this->childFinished = kj::mv(pair.fulfiller);
co_await pair.promise;
co_return result::success();
} catch (...) {
co_return result::failure(std::current_exception());
}
unsigned int Worker::failingExitStatus() unsigned int Worker::failingExitStatus()
{ {

View file

@ -84,6 +84,9 @@ protected:
*/ */
class Worker : public WorkerBase class Worker : public WorkerBase
{ {
public:
using Targets = std::map<GoalPtr, kj::Promise<Result<Goal::WorkResult>>>;
private: private:
bool running = false; bool running = false;
@ -143,13 +146,6 @@ private:
void goalFinished(GoalPtr goal, Goal::WorkResult & f); void goalFinished(GoalPtr goal, Goal::WorkResult & f);
kj::Own<kj::PromiseFulfiller<void>> childFinished;
/**
* Wait for input to become available.
*/
kj::Promise<Result<void>> waitForInput();
/** /**
* Remove a dead goal. * Remove a dead goal.
*/ */
@ -160,11 +156,6 @@ private:
*/ */
void childStarted(GoalPtr goal, kj::Promise<Result<Goal::WorkResult>> promise); void childStarted(GoalPtr goal, kj::Promise<Result<Goal::WorkResult>> promise);
/**
* Unregisters a running child process.
*/
void childTerminated(GoalPtr goal);
/** /**
* Pass current stats counters to the logger for progress bar updates. * Pass current stats counters to the logger for progress bar updates.
*/ */
@ -181,7 +172,7 @@ private:
statisticsUpdateInhibitor = {}; statisticsUpdateInhibitor = {};
} }
kj::Promise<Result<void>> runImpl(); kj::Promise<Result<void>> runImpl(Targets _topGoals);
kj::Promise<Result<void>> boopGC(LocalStore & localStore); kj::Promise<Result<void>> boopGC(LocalStore & localStore);
public: public:
@ -197,7 +188,6 @@ public:
private: private:
kj::TaskSet children; kj::TaskSet children;
std::exception_ptr childException;
public: public:
struct HookState { struct HookState {
@ -277,8 +267,6 @@ private:
makeGoal(const DerivedPath & req, BuildMode buildMode = bmNormal) override; makeGoal(const DerivedPath & req, BuildMode buildMode = bmNormal) override;
public: public:
using Targets = std::map<GoalPtr, kj::Promise<Result<Goal::WorkResult>>>;
/** /**
* Loop until the specified top-level goals have finished. * Loop until the specified top-level goals have finished.
*/ */