libstore: make waiting for a while a promise

this simplifies waitForInput quite a lot, and at the same time makes
polling less thundering-herd-y. it even fixes early polling wakeups!

Change-Id: I6dfa62ce91729b8880342117d71af5ae33366414
This commit is contained in:
eldritch horrors 2024-09-01 01:37:10 +02:00
parent 0478949c72
commit cd1ceffb0e
6 changed files with 51 additions and 68 deletions

View file

@ -736,7 +736,7 @@ try {
if (!actLock)
actLock = std::make_unique<Activity>(*logger, lvlWarn, actBuildWaiting,
fmt("waiting for lock on %s", Magenta(showPaths(lockFiles))));
return {WaitForAWhile{}};
return waitForAWhile();
}
actLock.reset();
@ -776,32 +776,32 @@ try {
auto hookReply = tryBuildHook(inBuildSlot);
auto result = std::visit(
overloaded{
[&](HookReply::Accept & a) -> std::optional<WorkResult> {
[&](HookReply::Accept & a) -> std::optional<kj::Promise<Result<WorkResult>>> {
/* Yes, it has started doing so. Wait until we get
EOF from the hook. */
actLock.reset();
buildResult.startTime = time(0); // inexact
state = &DerivationGoal::buildDone;
started();
return WaitForWorld{std::move(a.promise), false};
return {{WaitForWorld{std::move(a.promise), false}}};
},
[&](HookReply::Postpone) -> std::optional<WorkResult> {
[&](HookReply::Postpone) -> std::optional<kj::Promise<Result<WorkResult>>> {
/* Not now; wait until at least one child finishes or
the wake-up timeout expires. */
if (!actLock)
actLock = std::make_unique<Activity>(*logger, lvlTalkative, actBuildWaiting,
fmt("waiting for a machine to build '%s'", Magenta(worker.store.printStorePath(drvPath))));
outputLocks.unlock();
return WaitForAWhile{};
return waitForAWhile();
},
[&](HookReply::Decline) -> std::optional<WorkResult> {
[&](HookReply::Decline) -> std::optional<kj::Promise<Result<WorkResult>>> {
/* We should do it ourselves. */
return std::nullopt;
},
},
hookReply);
if (result) {
return {std::move(*result)};
return std::move(*result);
}
}

View file

@ -1,4 +1,6 @@
#include "goal.hh"
#include "worker.hh"
#include <kj/time.h>
namespace nix {
@ -15,4 +17,15 @@ void Goal::trace(std::string_view s)
debug("%1%: %2%", name, s);
}
kj::Promise<Result<Goal::WorkResult>> Goal::waitForAWhile()
try {
trace("wait for a while");
/* If we are polling goals that are waiting for a lock, then wake
up after a few seconds at most. */
co_await worker.aio.provider->getTimer().afterDelay(settings.pollInterval.get() * kj::SECONDS);
co_return ContinueImmediately{};
} catch (...) {
co_return std::current_exception();
}
}

View file

@ -118,7 +118,6 @@ public:
struct [[nodiscard]] StillAlive {};
struct [[nodiscard]] WaitForSlot {};
struct [[nodiscard]] WaitForAWhile {};
struct [[nodiscard]] ContinueImmediately {};
struct [[nodiscard]] WaitForGoals {
Goals goals;
@ -140,7 +139,6 @@ public:
struct [[nodiscard]] WorkResult : std::variant<
StillAlive,
WaitForSlot,
WaitForAWhile,
ContinueImmediately,
WaitForGoals,
WaitForWorld,
@ -150,6 +148,11 @@ public:
using variant::variant;
};
protected:
kj::Promise<Result<WorkResult>> waitForAWhile();
public:
/**
* Exception containing an error message, if any.
*/

View file

@ -212,7 +212,7 @@ try {
if (!actLock)
actLock = std::make_unique<Activity>(*logger, lvlWarn, actBuildWaiting,
fmt("waiting for a free build user ID for '%s'", Magenta(worker.store.printStorePath(drvPath))));
return {WaitForAWhile{}};
return waitForAWhile();
}
}

View file

@ -32,7 +32,6 @@ Worker::Worker(Store & store, Store & evalStore, kj::AsyncIoContext & aio)
/* Debugging: prevent recursive workers. */
nrLocalBuilds = 0;
nrSubstitutions = 0;
lastWokenUp = steady_time_point::min();
}
@ -212,7 +211,6 @@ void Worker::handleWorkResult(GoalPtr goal, Goal::WorkResult how)
overloaded{
[&](Goal::StillAlive) {},
[&](Goal::WaitForSlot) { waitForBuildSlot(goal); },
[&](Goal::WaitForAWhile) { waitForAWhile(goal); },
[&](Goal::ContinueImmediately) { wakeUp(goal); },
[&](Goal::WaitForGoals & w) {
for (auto & dep : w.goals) {
@ -221,12 +219,25 @@ void Worker::handleWorkResult(GoalPtr goal, Goal::WorkResult how)
}
},
[&](Goal::WaitForWorld & w) {
childStarted(goal, std::move(w.promise), w.inBuildSlot);
childStarted(
goal,
w.promise.then([](auto r) -> Result<Goal::WorkResult> {
if (r.has_value()) {
return {Goal::ContinueImmediately{}};
} else if (r.has_error()) {
return {std::move(r).error()};
} else {
return r.exception();
}
}),
w.inBuildSlot
);
},
[&](Goal::Finished & f) { goalFinished(goal, f); },
},
how
);
updateStatistics();
}
void Worker::removeGoal(GoalPtr goal)
@ -257,17 +268,15 @@ void Worker::wakeUp(GoalPtr goal)
}
void Worker::childStarted(GoalPtr goal, kj::Promise<Outcome<void, Goal::Finished>> promise,
void Worker::childStarted(GoalPtr goal, kj::Promise<Result<Goal::WorkResult>> promise,
bool inBuildSlot)
{
children.add(promise
.then([this, goal](auto result) {
if (result.has_value()) {
handleWorkResult(goal, Goal::ContinueImmediately{});
} else if (result.has_error()) {
handleWorkResult(goal, std::move(result.assume_error()));
handleWorkResult(goal, std::move(result.assume_value()));
} else {
childException = result.assume_exception();
childException = result.assume_error();
}
})
.attach(Finally{[this, goal, inBuildSlot] {
@ -331,13 +340,6 @@ void Worker::waitForBuildSlot(GoalPtr goal)
}
void Worker::waitForAWhile(GoalPtr goal)
{
debug("wait for a while");
waitingForAWhile.insert(goal);
}
void Worker::updateStatistics()
{
// only update progress info while running. this notably excludes updating
@ -397,8 +399,12 @@ Goals Worker::run(std::function<Goals (GoalFactory &)> req)
const bool inSlot = goal->jobCategory() == JobCategory::Substitution
? nrSubstitutions < std::max(1U, (unsigned int) settings.maxSubstitutionJobs)
: nrLocalBuilds < settings.maxBuildJobs;
handleWorkResult(goal, goal->work(inSlot).wait(aio.waitScope).value());
updateStatistics();
auto result = goal->work(inSlot);
if (result.poll(aio.waitScope)) {
handleWorkResult(goal, result.wait(aio.waitScope).value());
} else {
childStarted(goal, std::move(result), false);
}
if (topGoals.empty()) break; // stuff may have been cancelled
}
@ -407,7 +413,7 @@ Goals Worker::run(std::function<Goals (GoalFactory &)> req)
if (topGoals.empty()) break;
/* Wait for input. */
if (!children.isEmpty() || !waitingForAWhile.empty())
if (!children.isEmpty())
waitForInput();
else {
assert(!awake.empty());
@ -445,22 +451,12 @@ void Worker::waitForInput()
terminated. */
std::optional<long> timeout = 0;
auto before = steady_time_point::clock::now();
// Periodicallty wake up to see if we need to run the garbage collector.
if (settings.minFree.get() != 0) {
timeout = 10;
}
/* If we are polling goals that are waiting for a lock, then wake
up after a few seconds at most. */
if (!waitingForAWhile.empty()) {
if (lastWokenUp == steady_time_point::min() || lastWokenUp > before) lastWokenUp = before;
timeout = std::max(1L,
(long) std::chrono::duration_cast<std::chrono::seconds>(
lastWokenUp + std::chrono::seconds(settings.pollInterval) - before).count());
} else lastWokenUp = steady_time_point::min();
if (timeout)
vomit("sleeping %d seconds", *timeout);
@ -475,17 +471,6 @@ void Worker::waitForInput()
}();
waitFor.wait(aio.waitScope);
auto after = steady_time_point::clock::now();
if (!waitingForAWhile.empty() && lastWokenUp + std::chrono::seconds(settings.pollInterval) <= after) {
lastWokenUp = after;
for (auto & i : waitingForAWhile) {
GoalPtr goal = i.lock();
if (goal) wakeUp(goal);
}
waitingForAWhile.clear();
}
}

View file

@ -117,16 +117,6 @@ private:
std::map<StorePath, std::weak_ptr<PathSubstitutionGoal>> substitutionGoals;
std::map<DrvOutput, std::weak_ptr<DrvOutputSubstitutionGoal>> drvOutputSubstitutionGoals;
/**
* Goals sleeping for a few seconds (polling a lock).
*/
WeakGoals waitingForAWhile;
/**
* Last time the goals in `waitingForAWhile` where woken up.
*/
steady_time_point lastWokenUp;
/**
* Cache for pathContentsGood().
*/
@ -164,14 +154,6 @@ private:
*/
void waitForBuildSlot(GoalPtr goal);
/**
* Wait for a few seconds and then retry this goal. Used when
* waiting for a lock held by another process. This kind of
* polling is inefficient, but POSIX doesn't really provide a way
* to wait for multiple locks in the main select() loop.
*/
void waitForAWhile(GoalPtr goal);
/**
* Wake up a goal (i.e., there is something for it to do).
*/
@ -191,7 +173,7 @@ private:
* Registers a running child process. `inBuildSlot` means that
* the process counts towards the jobs limit.
*/
void childStarted(GoalPtr goal, kj::Promise<Outcome<void, Goal::Finished>> promise,
void childStarted(GoalPtr goal, kj::Promise<Result<Goal::WorkResult>> promise,
bool inBuildSlot);
/**