diff --git a/src/libstore/build/goal.cc b/src/libstore/build/goal.cc index cf52280ed..ef5e8ae96 100644 --- a/src/libstore/build/goal.cc +++ b/src/libstore/build/goal.cc @@ -57,6 +57,7 @@ try { while (auto item = co_await collectDeps.next()) { auto & [dep, _result] = *item; + BOOST_OUTCOME_CO_TRYV(_result); waiteeDone(dep); diff --git a/src/libstore/build/worker.cc b/src/libstore/build/worker.cc index 839b56bc8..5ca7cde76 100644 --- a/src/libstore/build/worker.cc +++ b/src/libstore/build/worker.cc @@ -1,3 +1,4 @@ +#include "async-collect.hh" #include "charptr-cast.hh" #include "worker.hh" #include "finally.hh" @@ -6,6 +7,8 @@ #include "local-derivation-goal.hh" #include "signals.hh" #include "hook-instance.hh" // IWYU pragma: keep +#include +#include namespace nix { @@ -231,20 +234,9 @@ void Worker::childStarted(GoalPtr goal, kj::Promise> pr if (result.has_value()) { goalFinished(goal, result.assume_value()); } else { - childException = result.assume_error(); + goal->notify->fulfill(result.assume_error()); } - }) - .attach(Finally{[this, goal] { - childTerminated(goal); - }})); -} - - -void Worker::childTerminated(GoalPtr goal) -{ - if (childFinished) { - childFinished->fulfill(); - } + })); } @@ -282,9 +274,12 @@ std::vector Worker::run(std::function req) running = true; Finally const _stop([&] { running = false; }); + std::vector results; + topGoals.clear(); for (auto & [goal, _promise] : _topGoals) { topGoals.insert(goal); + results.push_back(goal); } auto onInterrupt = kj::newPromiseAndCrossThreadFulfiller>(); @@ -292,8 +287,9 @@ std::vector Worker::run(std::function req) return result::failure(std::make_exception_ptr(makeInterrupted())); }); - auto promise = - runImpl().exclusiveJoin(updateStatistics()).exclusiveJoin(std::move(onInterrupt.promise)); + auto promise = runImpl(std::move(_topGoals)) + .exclusiveJoin(updateStatistics()) + .exclusiveJoin(std::move(onInterrupt.promise)); // TODO GC interface? if (auto localStore = dynamic_cast(&store); localStore && settings.minFree != 0) { @@ -303,27 +299,24 @@ std::vector Worker::run(std::function req) promise.wait(aio.waitScope).value(); - std::vector results; - for (auto & [i, _p] : _topGoals) { - results.push_back(i); - } return results; } -kj::Promise> Worker::runImpl() +kj::Promise> Worker::runImpl(Targets _topGoals) try { debug("entered goal loop"); - while (1) { + kj::Vector promises(_topGoals.size()); + for (auto & gp : _topGoals) { + promises.add(std::move(gp)); + } + + auto collect = AsyncCollect(promises.releaseAsArray()); + while (auto done = co_await collect.next()) { + // propagate goal exceptions outward + BOOST_OUTCOME_CO_TRYV(done->second); + if (topGoals.empty()) break; - - /* Wait for input. */ - if (!children.isEmpty()) - (co_await waitForInput()).value(); - - if (childException) { - std::rethrow_exception(childException); - } } /* If --keep-going is not set, it's possible that the main goal @@ -346,18 +339,6 @@ try { co_return result::failure(std::current_exception()); } -kj::Promise> Worker::waitForInput() -try { - printMsg(lvlVomit, "waiting for children"); - - auto pair = kj::newPromiseAndFulfiller(); - this->childFinished = kj::mv(pair.fulfiller); - co_await pair.promise; - co_return result::success(); -} catch (...) { - co_return result::failure(std::current_exception()); -} - unsigned int Worker::failingExitStatus() { diff --git a/src/libstore/build/worker.hh b/src/libstore/build/worker.hh index 78e204b5a..26832c3b1 100644 --- a/src/libstore/build/worker.hh +++ b/src/libstore/build/worker.hh @@ -84,6 +84,9 @@ protected: */ class Worker : public WorkerBase { +public: + using Targets = std::map>>; + private: bool running = false; @@ -143,13 +146,6 @@ private: void goalFinished(GoalPtr goal, Goal::WorkResult & f); - kj::Own> childFinished; - - /** - * Wait for input to become available. - */ - kj::Promise> waitForInput(); - /** * Remove a dead goal. */ @@ -160,11 +156,6 @@ private: */ void childStarted(GoalPtr goal, kj::Promise> promise); - /** - * Unregisters a running child process. - */ - void childTerminated(GoalPtr goal); - /** * Pass current stats counters to the logger for progress bar updates. */ @@ -181,7 +172,7 @@ private: statisticsUpdateInhibitor = {}; } - kj::Promise> runImpl(); + kj::Promise> runImpl(Targets _topGoals); kj::Promise> boopGC(LocalStore & localStore); public: @@ -197,7 +188,6 @@ public: private: kj::TaskSet children; - std::exception_ptr childException; public: struct HookState { @@ -277,8 +267,6 @@ private: makeGoal(const DerivedPath & req, BuildMode buildMode = bmNormal) override; public: - using Targets = std::map>>; - /** * Loop until the specified top-level goals have finished. */