diff --git a/src/libstore/build/derivation-goal.cc b/src/libstore/build/derivation-goal.cc index c95092913..3c4257f08 100644 --- a/src/libstore/build/derivation-goal.cc +++ b/src/libstore/build/derivation-goal.cc @@ -134,9 +134,9 @@ Goal::Finished DerivationGoal::timedOut(Error && ex) } -kj::Promise> DerivationGoal::work(bool inBuildSlot) noexcept +kj::Promise> DerivationGoal::work() noexcept { - return (this->*state)(inBuildSlot); + return (this->*state)(slotToken.valid()); } void DerivationGoal::addWantedOutputs(const OutputsSpec & outputs) @@ -783,7 +783,7 @@ try { buildResult.startTime = time(0); // inexact state = &DerivationGoal::buildDone; started(); - return {{WaitForWorld{std::move(a.promise), false}}}; + return {{WaitForWorld{std::move(a.promise)}}}; }, [&](HookReply::Postpone) -> std::optional>> { /* Not now; wait until at least one child finishes or @@ -980,6 +980,7 @@ kj::Promise> DerivationGoal::buildDone(bool inBuildSlot try { trace("build done"); + slotToken = {}; Finally releaseBuildUser([&](){ this->cleanupHookFinally(); }); cleanupPreChildKill(); diff --git a/src/libstore/build/derivation-goal.hh b/src/libstore/build/derivation-goal.hh index 46b07fc0b..d60bb0b4c 100644 --- a/src/libstore/build/derivation-goal.hh +++ b/src/libstore/build/derivation-goal.hh @@ -249,7 +249,7 @@ struct DerivationGoal : public Goal std::string key() override; - kj::Promise> work(bool inBuildSlot) noexcept override; + kj::Promise> work() noexcept override; /** * Add wanted outputs to an already existing derivation goal. diff --git a/src/libstore/build/drv-output-substitution-goal.cc b/src/libstore/build/drv-output-substitution-goal.cc index fdee53699..6ef00d1ff 100644 --- a/src/libstore/build/drv-output-substitution-goal.cc +++ b/src/libstore/build/drv-output-substitution-goal.cc @@ -42,7 +42,10 @@ try { trace("trying next substituter"); if (!inBuildSlot) { - return {WaitForSlot{}}; + return worker.substitutions.acquire().then([this](auto token) { + slotToken = std::move(token); + return work(); + }); } maintainRunningSubstitutions = worker.runningSubstitutions.addTemporarily(1); @@ -81,7 +84,7 @@ try { state = &DrvOutputSubstitutionGoal::realisationFetched; return {WaitForWorld{ - pipe.promise.then([]() -> Outcome { return result::success(); }), true + pipe.promise.then([]() -> Outcome { return result::success(); }) }}; } catch (...) { return {std::current_exception()}; @@ -90,6 +93,7 @@ try { kj::Promise> DrvOutputSubstitutionGoal::realisationFetched(bool inBuildSlot) noexcept try { maintainRunningSubstitutions.reset(); + slotToken = {}; try { outputInfo = downloadState->result.get(); @@ -168,9 +172,9 @@ std::string DrvOutputSubstitutionGoal::key() return "a$" + std::string(id.to_string()); } -kj::Promise> DrvOutputSubstitutionGoal::work(bool inBuildSlot) noexcept +kj::Promise> DrvOutputSubstitutionGoal::work() noexcept { - return (this->*state)(inBuildSlot); + return (this->*state)(slotToken.valid()); } diff --git a/src/libstore/build/drv-output-substitution-goal.hh b/src/libstore/build/drv-output-substitution-goal.hh index a35bf67ee..805b65bfa 100644 --- a/src/libstore/build/drv-output-substitution-goal.hh +++ b/src/libstore/build/drv-output-substitution-goal.hh @@ -76,7 +76,7 @@ public: std::string key() override; - kj::Promise> work(bool inBuildSlot) noexcept override; + kj::Promise> work() noexcept override; JobCategory jobCategory() const override { return JobCategory::Substitution; diff --git a/src/libstore/build/goal.hh b/src/libstore/build/goal.hh index fbf767e8d..1ccf9716b 100644 --- a/src/libstore/build/goal.hh +++ b/src/libstore/build/goal.hh @@ -1,6 +1,7 @@ #pragma once ///@file +#include "async-semaphore.hh" #include "result.hh" #include "types.hh" #include "store-api.hh" @@ -112,19 +113,20 @@ struct Goal */ BuildResult buildResult; +protected: + AsyncSemaphore::Token slotToken; + public: struct Finished; struct [[nodiscard]] StillAlive {}; - struct [[nodiscard]] WaitForSlot {}; struct [[nodiscard]] ContinueImmediately {}; struct [[nodiscard]] WaitForGoals { Goals goals; }; struct [[nodiscard]] WaitForWorld { kj::Promise> promise; - bool inBuildSlot; }; struct [[nodiscard]] Finished { ExitCode exitCode; @@ -138,7 +140,6 @@ public: struct [[nodiscard]] WorkResult : std::variant< StillAlive, - WaitForSlot, ContinueImmediately, WaitForGoals, WaitForWorld, @@ -168,7 +169,7 @@ public: trace("goal destroyed"); } - virtual kj::Promise> work(bool inBuildSlot) noexcept = 0; + virtual kj::Promise> work() noexcept = 0; virtual void waiteeDone(GoalPtr waitee) { } diff --git a/src/libstore/build/local-derivation-goal.cc b/src/libstore/build/local-derivation-goal.cc index 9ec87f1b6..2443cfb5a 100644 --- a/src/libstore/build/local-derivation-goal.cc +++ b/src/libstore/build/local-derivation-goal.cc @@ -156,8 +156,11 @@ try { if (!inBuildSlot) { state = &DerivationGoal::tryToBuild; outputLocks.unlock(); - if (0U != settings.maxBuildJobs) { - return {WaitForSlot{}}; + if (worker.localBuilds.capacity() > 0) { + return worker.localBuilds.acquire().then([this](auto token) { + slotToken = std::move(token); + return work(); + }); } if (getMachines().empty()) { throw Error( @@ -248,7 +251,7 @@ try { state = &DerivationGoal::buildDone; started(); - return {WaitForWorld{std::move(promise), true}}; + return {WaitForWorld{std::move(promise)}}; } catch (BuildError & e) { outputLocks.unlock(); diff --git a/src/libstore/build/substitution-goal.cc b/src/libstore/build/substitution-goal.cc index 058f858d4..6d90196fa 100644 --- a/src/libstore/build/substitution-goal.cc +++ b/src/libstore/build/substitution-goal.cc @@ -45,9 +45,9 @@ Goal::Finished PathSubstitutionGoal::done( } -kj::Promise> PathSubstitutionGoal::work(bool inBuildSlot) noexcept +kj::Promise> PathSubstitutionGoal::work() noexcept { - return (this->*state)(inBuildSlot); + return (this->*state)(slotToken.valid()); } @@ -203,7 +203,10 @@ try { trace("trying to run"); if (!inBuildSlot) { - return {WaitForSlot{}}; + return worker.substitutions.acquire().then([this](auto token) { + slotToken = std::move(token); + return work(); + }); } maintainRunningSubstitutions = worker.runningSubstitutions.addTemporarily(1); @@ -236,7 +239,7 @@ try { state = &PathSubstitutionGoal::finished; return {WaitForWorld{ - pipe.promise.then([]() -> Outcome { return result::success(); }), true + pipe.promise.then([]() -> Outcome { return result::success(); }) }}; } catch (...) { return {std::current_exception()}; @@ -248,6 +251,7 @@ try { trace("substitute finished"); try { + slotToken = {}; thr.get(); } catch (std::exception & e) { printError(e.what()); diff --git a/src/libstore/build/substitution-goal.hh b/src/libstore/build/substitution-goal.hh index 91e256fd7..cef3a4c5c 100644 --- a/src/libstore/build/substitution-goal.hh +++ b/src/libstore/build/substitution-goal.hh @@ -99,7 +99,7 @@ public: return "a$" + std::string(storePath.name()) + "$" + worker.store.printStorePath(storePath); } - kj::Promise> work(bool inBuildSlot) noexcept override; + kj::Promise> work() noexcept override; /** * The states. diff --git a/src/libstore/build/worker.cc b/src/libstore/build/worker.cc index 2cc2828b1..e19917d91 100644 --- a/src/libstore/build/worker.cc +++ b/src/libstore/build/worker.cc @@ -27,11 +27,13 @@ Worker::Worker(Store & store, Store & evalStore, kj::AsyncIoContext & aio) , store(store) , evalStore(evalStore) , aio(aio) + /* Make sure that we are always allowed to run at least one substitution. + This prevents infinite waiting. */ + , substitutions(std::max(1, settings.maxSubstitutionJobs)) + , localBuilds(settings.maxBuildJobs) , children(errorHandler) { /* Debugging: prevent recursive workers. */ - nrLocalBuilds = 0; - nrSubstitutions = 0; } @@ -210,7 +212,6 @@ void Worker::handleWorkResult(GoalPtr goal, Goal::WorkResult how) std::visit( overloaded{ [&](Goal::StillAlive) {}, - [&](Goal::WaitForSlot) { waitForBuildSlot(goal); }, [&](Goal::ContinueImmediately) { wakeUp(goal); }, [&](Goal::WaitForGoals & w) { for (auto & dep : w.goals) { @@ -219,19 +220,15 @@ void Worker::handleWorkResult(GoalPtr goal, Goal::WorkResult how) } }, [&](Goal::WaitForWorld & w) { - childStarted( - goal, - w.promise.then([](auto r) -> Result { - if (r.has_value()) { - return {Goal::ContinueImmediately{}}; - } else if (r.has_error()) { - return {std::move(r).error()}; - } else { - return r.exception(); - } - }), - w.inBuildSlot - ); + childStarted(goal, w.promise.then([](auto r) -> Result { + if (r.has_value()) { + return {Goal::ContinueImmediately{}}; + } else if (r.has_error()) { + return {std::move(r).error()}; + } else { + return r.exception(); + } + })); }, [&](Goal::Finished & f) { goalFinished(goal, f); }, }, @@ -268,8 +265,7 @@ void Worker::wakeUp(GoalPtr goal) } -void Worker::childStarted(GoalPtr goal, kj::Promise> promise, - bool inBuildSlot) +void Worker::childStarted(GoalPtr goal, kj::Promise> promise) { children.add(promise .then([this, goal](auto result) { @@ -279,64 +275,17 @@ void Worker::childStarted(GoalPtr goal, kj::Promise> pr childException = result.assume_error(); } }) - .attach(Finally{[this, goal, inBuildSlot] { - childTerminated(goal, inBuildSlot); + .attach(Finally{[this, goal] { + childTerminated(goal); }})); - if (inBuildSlot) { - switch (goal->jobCategory()) { - case JobCategory::Substitution: - nrSubstitutions++; - break; - case JobCategory::Build: - nrLocalBuilds++; - break; - default: - abort(); - } - } } -void Worker::childTerminated(GoalPtr goal, bool inBuildSlot) +void Worker::childTerminated(GoalPtr goal) { if (childFinished) { childFinished->fulfill(); } - - if (inBuildSlot) { - switch (goal->jobCategory()) { - case JobCategory::Substitution: - assert(nrSubstitutions > 0); - nrSubstitutions--; - break; - case JobCategory::Build: - assert(nrLocalBuilds > 0); - nrLocalBuilds--; - break; - default: - abort(); - } - } - - /* Wake up goals waiting for a build slot. */ - for (auto & j : wantingToBuild) { - GoalPtr goal = j.lock(); - if (goal) wakeUp(goal); - } - - wantingToBuild.clear(); -} - - -void Worker::waitForBuildSlot(GoalPtr goal) -{ - goal->trace("wait for build slot"); - bool isSubstitutionGoal = goal->jobCategory() == JobCategory::Substitution; - if ((!isSubstitutionGoal && nrLocalBuilds < settings.maxBuildJobs) || - (isSubstitutionGoal && nrSubstitutions < settings.maxSubstitutionJobs)) - wakeUp(goal); /* we can do it right away */ - else - wantingToBuild.insert(goal); } @@ -394,16 +343,11 @@ Goals Worker::run(std::function req) awake.clear(); for (auto & goal : awake2) { checkInterrupt(); - /* Make sure that we are always allowed to run at least one substitution. - This prevents infinite waiting. */ - const bool inSlot = goal->jobCategory() == JobCategory::Substitution - ? nrSubstitutions < std::max(1U, (unsigned int) settings.maxSubstitutionJobs) - : nrLocalBuilds < settings.maxBuildJobs; - auto result = goal->work(inSlot); + auto result = goal->work(); if (result.poll(aio.waitScope)) { handleWorkResult(goal, result.wait(aio.waitScope).value()); } else { - childStarted(goal, std::move(result), false); + childStarted(goal, std::move(result)); } if (topGoals.empty()) break; // stuff may have been cancelled @@ -428,7 +372,6 @@ Goals Worker::run(std::function req) exited while some of its subgoals were still active. But if --keep-going *is* set, then they must all be finished now. */ assert(!settings.keepGoing || awake.empty()); - assert(!settings.keepGoing || wantingToBuild.empty()); assert(!settings.keepGoing || children.isEmpty()); return _topGoals; diff --git a/src/libstore/build/worker.hh b/src/libstore/build/worker.hh index daa612c06..834ecfda3 100644 --- a/src/libstore/build/worker.hh +++ b/src/libstore/build/worker.hh @@ -1,6 +1,7 @@ #pragma once ///@file +#include "async-semaphore.hh" #include "notifying-counter.hh" #include "types.hh" #include "lock.hh" @@ -93,22 +94,6 @@ private: */ WeakGoals awake; - /** - * Goals waiting for a build slot. - */ - WeakGoals wantingToBuild; - - /** - * Number of build slots occupied. This includes local builds but does not - * include substitutions or remote builds via the build hook. - */ - unsigned int nrLocalBuilds; - - /** - * Number of substitution slots occupied. - */ - unsigned int nrSubstitutions; - /** * Maps used to prevent multiple instantiations of a goal for the * same derivation / path. @@ -148,12 +133,6 @@ private: kj::Own> childFinished; - /** - * Put `goal` to sleep until a build slot becomes available (which - * might be right away). - */ - void waitForBuildSlot(GoalPtr goal); - /** * Wake up a goal (i.e., there is something for it to do). */ @@ -170,16 +149,14 @@ private: void removeGoal(GoalPtr goal); /** - * Registers a running child process. `inBuildSlot` means that - * the process counts towards the jobs limit. + * Registers a running child process. */ - void childStarted(GoalPtr goal, kj::Promise> promise, - bool inBuildSlot); + void childStarted(GoalPtr goal, kj::Promise> promise); /** * Unregisters a running child process. */ - void childTerminated(GoalPtr goal, bool inBuildSlot); + void childTerminated(GoalPtr goal); /** * Pass current stats counters to the logger for progress bar updates. @@ -205,6 +182,7 @@ public: Store & store; Store & evalStore; kj::AsyncIoContext & aio; + AsyncSemaphore substitutions, localBuilds; private: kj::TaskSet children;