diff --git a/src/libstore/build/derivation-goal.cc b/src/libstore/build/derivation-goal.cc index f40611b31..c95092913 100644 --- a/src/libstore/build/derivation-goal.cc +++ b/src/libstore/build/derivation-goal.cc @@ -736,7 +736,7 @@ try { if (!actLock) actLock = std::make_unique(*logger, lvlWarn, actBuildWaiting, fmt("waiting for lock on %s", Magenta(showPaths(lockFiles)))); - return {WaitForAWhile{}}; + return waitForAWhile(); } actLock.reset(); @@ -776,32 +776,32 @@ try { auto hookReply = tryBuildHook(inBuildSlot); auto result = std::visit( overloaded{ - [&](HookReply::Accept & a) -> std::optional { + [&](HookReply::Accept & a) -> std::optional>> { /* Yes, it has started doing so. Wait until we get EOF from the hook. */ actLock.reset(); buildResult.startTime = time(0); // inexact state = &DerivationGoal::buildDone; started(); - return WaitForWorld{std::move(a.promise), false}; + return {{WaitForWorld{std::move(a.promise), false}}}; }, - [&](HookReply::Postpone) -> std::optional { + [&](HookReply::Postpone) -> std::optional>> { /* Not now; wait until at least one child finishes or the wake-up timeout expires. */ if (!actLock) actLock = std::make_unique(*logger, lvlTalkative, actBuildWaiting, fmt("waiting for a machine to build '%s'", Magenta(worker.store.printStorePath(drvPath)))); outputLocks.unlock(); - return WaitForAWhile{}; + return waitForAWhile(); }, - [&](HookReply::Decline) -> std::optional { + [&](HookReply::Decline) -> std::optional>> { /* We should do it ourselves. */ return std::nullopt; }, }, hookReply); if (result) { - return {std::move(*result)}; + return std::move(*result); } } diff --git a/src/libstore/build/goal.cc b/src/libstore/build/goal.cc index 82861ad2b..649093dbd 100644 --- a/src/libstore/build/goal.cc +++ b/src/libstore/build/goal.cc @@ -1,4 +1,6 @@ #include "goal.hh" +#include "worker.hh" +#include namespace nix { @@ -15,4 +17,15 @@ void Goal::trace(std::string_view s) debug("%1%: %2%", name, s); } +kj::Promise> Goal::waitForAWhile() +try { + trace("wait for a while"); + /* If we are polling goals that are waiting for a lock, then wake + up after a few seconds at most. */ + co_await worker.aio.provider->getTimer().afterDelay(settings.pollInterval.get() * kj::SECONDS); + co_return ContinueImmediately{}; +} catch (...) { + co_return std::current_exception(); +} + } diff --git a/src/libstore/build/goal.hh b/src/libstore/build/goal.hh index 3f6e8396e..fbf767e8d 100644 --- a/src/libstore/build/goal.hh +++ b/src/libstore/build/goal.hh @@ -118,7 +118,6 @@ public: struct [[nodiscard]] StillAlive {}; struct [[nodiscard]] WaitForSlot {}; - struct [[nodiscard]] WaitForAWhile {}; struct [[nodiscard]] ContinueImmediately {}; struct [[nodiscard]] WaitForGoals { Goals goals; @@ -140,7 +139,6 @@ public: struct [[nodiscard]] WorkResult : std::variant< StillAlive, WaitForSlot, - WaitForAWhile, ContinueImmediately, WaitForGoals, WaitForWorld, @@ -150,6 +148,11 @@ public: using variant::variant; }; +protected: + kj::Promise> waitForAWhile(); + +public: + /** * Exception containing an error message, if any. */ diff --git a/src/libstore/build/local-derivation-goal.cc b/src/libstore/build/local-derivation-goal.cc index 040fa7461..9ec87f1b6 100644 --- a/src/libstore/build/local-derivation-goal.cc +++ b/src/libstore/build/local-derivation-goal.cc @@ -212,7 +212,7 @@ try { if (!actLock) actLock = std::make_unique(*logger, lvlWarn, actBuildWaiting, fmt("waiting for a free build user ID for '%s'", Magenta(worker.store.printStorePath(drvPath)))); - return {WaitForAWhile{}}; + return waitForAWhile(); } } diff --git a/src/libstore/build/worker.cc b/src/libstore/build/worker.cc index 284adbc50..27d8e6ee1 100644 --- a/src/libstore/build/worker.cc +++ b/src/libstore/build/worker.cc @@ -32,7 +32,6 @@ Worker::Worker(Store & store, Store & evalStore, kj::AsyncIoContext & aio) /* Debugging: prevent recursive workers. */ nrLocalBuilds = 0; nrSubstitutions = 0; - lastWokenUp = steady_time_point::min(); } @@ -212,7 +211,6 @@ void Worker::handleWorkResult(GoalPtr goal, Goal::WorkResult how) overloaded{ [&](Goal::StillAlive) {}, [&](Goal::WaitForSlot) { waitForBuildSlot(goal); }, - [&](Goal::WaitForAWhile) { waitForAWhile(goal); }, [&](Goal::ContinueImmediately) { wakeUp(goal); }, [&](Goal::WaitForGoals & w) { for (auto & dep : w.goals) { @@ -221,12 +219,25 @@ void Worker::handleWorkResult(GoalPtr goal, Goal::WorkResult how) } }, [&](Goal::WaitForWorld & w) { - childStarted(goal, std::move(w.promise), w.inBuildSlot); + childStarted( + goal, + w.promise.then([](auto r) -> Result { + if (r.has_value()) { + return {Goal::ContinueImmediately{}}; + } else if (r.has_error()) { + return {std::move(r).error()}; + } else { + return r.exception(); + } + }), + w.inBuildSlot + ); }, [&](Goal::Finished & f) { goalFinished(goal, f); }, }, how ); + updateStatistics(); } void Worker::removeGoal(GoalPtr goal) @@ -257,17 +268,15 @@ void Worker::wakeUp(GoalPtr goal) } -void Worker::childStarted(GoalPtr goal, kj::Promise> promise, +void Worker::childStarted(GoalPtr goal, kj::Promise> promise, bool inBuildSlot) { children.add(promise .then([this, goal](auto result) { if (result.has_value()) { - handleWorkResult(goal, Goal::ContinueImmediately{}); - } else if (result.has_error()) { - handleWorkResult(goal, std::move(result.assume_error())); + handleWorkResult(goal, std::move(result.assume_value())); } else { - childException = result.assume_exception(); + childException = result.assume_error(); } }) .attach(Finally{[this, goal, inBuildSlot] { @@ -331,13 +340,6 @@ void Worker::waitForBuildSlot(GoalPtr goal) } -void Worker::waitForAWhile(GoalPtr goal) -{ - debug("wait for a while"); - waitingForAWhile.insert(goal); -} - - void Worker::updateStatistics() { // only update progress info while running. this notably excludes updating @@ -397,8 +399,12 @@ Goals Worker::run(std::function req) const bool inSlot = goal->jobCategory() == JobCategory::Substitution ? nrSubstitutions < std::max(1U, (unsigned int) settings.maxSubstitutionJobs) : nrLocalBuilds < settings.maxBuildJobs; - handleWorkResult(goal, goal->work(inSlot).wait(aio.waitScope).value()); - updateStatistics(); + auto result = goal->work(inSlot); + if (result.poll(aio.waitScope)) { + handleWorkResult(goal, result.wait(aio.waitScope).value()); + } else { + childStarted(goal, std::move(result), false); + } if (topGoals.empty()) break; // stuff may have been cancelled } @@ -407,7 +413,7 @@ Goals Worker::run(std::function req) if (topGoals.empty()) break; /* Wait for input. */ - if (!children.isEmpty() || !waitingForAWhile.empty()) + if (!children.isEmpty()) waitForInput(); else { assert(!awake.empty()); @@ -445,22 +451,12 @@ void Worker::waitForInput() terminated. */ std::optional timeout = 0; - auto before = steady_time_point::clock::now(); // Periodicallty wake up to see if we need to run the garbage collector. if (settings.minFree.get() != 0) { timeout = 10; } - /* If we are polling goals that are waiting for a lock, then wake - up after a few seconds at most. */ - if (!waitingForAWhile.empty()) { - if (lastWokenUp == steady_time_point::min() || lastWokenUp > before) lastWokenUp = before; - timeout = std::max(1L, - (long) std::chrono::duration_cast( - lastWokenUp + std::chrono::seconds(settings.pollInterval) - before).count()); - } else lastWokenUp = steady_time_point::min(); - if (timeout) vomit("sleeping %d seconds", *timeout); @@ -475,17 +471,6 @@ void Worker::waitForInput() }(); waitFor.wait(aio.waitScope); - - auto after = steady_time_point::clock::now(); - - if (!waitingForAWhile.empty() && lastWokenUp + std::chrono::seconds(settings.pollInterval) <= after) { - lastWokenUp = after; - for (auto & i : waitingForAWhile) { - GoalPtr goal = i.lock(); - if (goal) wakeUp(goal); - } - waitingForAWhile.clear(); - } } diff --git a/src/libstore/build/worker.hh b/src/libstore/build/worker.hh index 37d80ba7b..daa612c06 100644 --- a/src/libstore/build/worker.hh +++ b/src/libstore/build/worker.hh @@ -117,16 +117,6 @@ private: std::map> substitutionGoals; std::map> drvOutputSubstitutionGoals; - /** - * Goals sleeping for a few seconds (polling a lock). - */ - WeakGoals waitingForAWhile; - - /** - * Last time the goals in `waitingForAWhile` where woken up. - */ - steady_time_point lastWokenUp; - /** * Cache for pathContentsGood(). */ @@ -164,14 +154,6 @@ private: */ void waitForBuildSlot(GoalPtr goal); - /** - * Wait for a few seconds and then retry this goal. Used when - * waiting for a lock held by another process. This kind of - * polling is inefficient, but POSIX doesn't really provide a way - * to wait for multiple locks in the main select() loop. - */ - void waitForAWhile(GoalPtr goal); - /** * Wake up a goal (i.e., there is something for it to do). */ @@ -191,7 +173,7 @@ private: * Registers a running child process. `inBuildSlot` means that * the process counts towards the jobs limit. */ - void childStarted(GoalPtr goal, kj::Promise> promise, + void childStarted(GoalPtr goal, kj::Promise> promise, bool inBuildSlot); /**