libstore: return goal results from Worker::run()

this will be needed to move all interesting result fields out of Goal
proper and into WorkResult. once that is done we can treat goals as a
totally internal construct of the worker mechanism, which also allows
us to fully stop exposing unclear intermediate state to Worker users.

Change-Id: I98d7778a4b5b2590b7b070bdfc164a22a0ef7190
This commit is contained in:
eldritch horrors 2024-10-05 00:38:35 +02:00
parent 40f154c0ed
commit fc6291e46d
3 changed files with 19 additions and 23 deletions

View file

@ -25,7 +25,7 @@ void Store::buildPaths(const std::vector<DerivedPath> & reqs, BuildMode buildMod
StringSet failed; StringSet failed;
std::shared_ptr<Error> ex; std::shared_ptr<Error> ex;
for (auto & i : goals) { for (auto & [i, result] : goals) {
if (i->ex) { if (i->ex) {
if (ex) if (ex)
logError(i->ex->info()); logError(i->ex->info());
@ -89,7 +89,7 @@ BuildResult Store::buildDerivation(const StorePath & drvPath, const BasicDerivat
goals.emplace(gf.makeBasicDerivationGoal(drvPath, drv, OutputsSpec::All{}, buildMode)); goals.emplace(gf.makeBasicDerivationGoal(drvPath, drv, OutputsSpec::All{}, buildMode));
return goals; return goals;
}); });
auto goal = *goals.begin(); auto [goal, result] = *goals.begin();
return goal->buildResult.restrictTo(DerivedPath::Built { return goal->buildResult.restrictTo(DerivedPath::Built {
.drvPath = makeConstantStorePathRef(drvPath), .drvPath = makeConstantStorePathRef(drvPath),
.outputs = OutputsSpec::All {}, .outputs = OutputsSpec::All {},
@ -116,7 +116,7 @@ void Store::ensurePath(const StorePath & path)
goals.emplace(gf.makePathSubstitutionGoal(path)); goals.emplace(gf.makePathSubstitutionGoal(path));
return goals; return goals;
}); });
auto goal = *goals.begin(); auto [goal, result] = *goals.begin();
if (goal->exitCode != Goal::ecSuccess) { if (goal->exitCode != Goal::ecSuccess) {
if (goal->ex) { if (goal->ex) {
@ -138,7 +138,7 @@ void Store::repairPath(const StorePath & path)
goals.emplace(gf.makePathSubstitutionGoal(path, Repair)); goals.emplace(gf.makePathSubstitutionGoal(path, Repair));
return goals; return goals;
}); });
auto goal = *goals.begin(); auto [goal, result] = *goals.begin();
if (goal->exitCode != Goal::ecSuccess) { if (goal->exitCode != Goal::ecSuccess) {
/* Since substituting the path didn't work, if we have a valid /* Since substituting the path didn't work, if we have a valid

View file

@ -231,7 +231,7 @@ void Worker::childStarted(GoalPtr goal, kj::Promise<Result<Goal::WorkResult>> pr
} }
kj::Promise<Result<void>> Worker::updateStatistics() kj::Promise<Result<Worker::Results>> Worker::updateStatistics()
try { try {
while (true) { while (true) {
statisticsUpdateInhibitor = co_await statisticsUpdateSignal.acquire(); statisticsUpdateInhibitor = co_await statisticsUpdateSignal.acquire();
@ -257,7 +257,7 @@ try {
co_return result::failure(std::current_exception()); co_return result::failure(std::current_exception());
} }
std::vector<GoalPtr> Worker::run(std::function<Targets (GoalFactory &)> req) Worker::Results Worker::run(std::function<Targets (GoalFactory &)> req)
{ {
auto topGoals = req(goalFactory()); auto topGoals = req(goalFactory());
@ -265,13 +265,7 @@ std::vector<GoalPtr> Worker::run(std::function<Targets (GoalFactory &)> req)
running = true; running = true;
Finally const _stop([&] { running = false; }); Finally const _stop([&] { running = false; });
std::vector<GoalPtr> results; auto onInterrupt = kj::newPromiseAndCrossThreadFulfiller<Result<Results>>();
for (auto & [goal, _promise] : topGoals) {
results.push_back(goal);
}
auto onInterrupt = kj::newPromiseAndCrossThreadFulfiller<Result<void>>();
auto interruptCallback = createInterruptCallback([&] { auto interruptCallback = createInterruptCallback([&] {
return result::failure(std::make_exception_ptr(makeInterrupted())); return result::failure(std::make_exception_ptr(makeInterrupted()));
}); });
@ -286,12 +280,10 @@ std::vector<GoalPtr> Worker::run(std::function<Targets (GoalFactory &)> req)
promise = promise.exclusiveJoin(boopGC(*localStore)); promise = promise.exclusiveJoin(boopGC(*localStore));
} }
promise.wait(aio.waitScope).value(); return promise.wait(aio.waitScope).value();
return results;
} }
kj::Promise<Result<void>> Worker::runImpl(Targets topGoals) kj::Promise<Result<Worker::Results>> Worker::runImpl(Targets topGoals)
try { try {
debug("entered goal loop"); debug("entered goal loop");
@ -300,10 +292,13 @@ try {
promises.add(std::move(gp)); promises.add(std::move(gp));
} }
Results results;
auto collect = AsyncCollect(promises.releaseAsArray()); auto collect = AsyncCollect(promises.releaseAsArray());
while (auto done = co_await collect.next()) { while (auto done = co_await collect.next()) {
// propagate goal exceptions outward // propagate goal exceptions outward
BOOST_OUTCOME_CO_TRY(auto result, done->second); BOOST_OUTCOME_CO_TRY(auto result, done->second);
results.emplace(done->first, result);
/* If a top-level goal failed, then kill all other goals /* If a top-level goal failed, then kill all other goals
(unless keepGoing was set). */ (unless keepGoing was set). */
@ -318,12 +313,12 @@ try {
--keep-going *is* set, then they must all be finished now. */ --keep-going *is* set, then they must all be finished now. */
assert(!settings.keepGoing || children.isEmpty()); assert(!settings.keepGoing || children.isEmpty());
co_return result::success(); co_return std::move(results);
} catch (...) { } catch (...) {
co_return result::failure(std::current_exception()); co_return result::failure(std::current_exception());
} }
kj::Promise<Result<void>> Worker::boopGC(LocalStore & localStore) kj::Promise<Result<Worker::Results>> Worker::boopGC(LocalStore & localStore)
try { try {
while (true) { while (true) {
co_await aio.provider->getTimer().afterDelay(10 * kj::SECONDS); co_await aio.provider->getTimer().afterDelay(10 * kj::SECONDS);

View file

@ -86,6 +86,7 @@ class Worker : public WorkerBase
{ {
public: public:
using Targets = std::map<GoalPtr, kj::Promise<Result<Goal::WorkResult>>>; using Targets = std::map<GoalPtr, kj::Promise<Result<Goal::WorkResult>>>;
using Results = std::map<GoalPtr, Goal::WorkResult>;
private: private:
@ -154,7 +155,7 @@ private:
/** /**
* Pass current stats counters to the logger for progress bar updates. * Pass current stats counters to the logger for progress bar updates.
*/ */
kj::Promise<Result<void>> updateStatistics(); kj::Promise<Result<Results>> updateStatistics();
AsyncSemaphore statisticsUpdateSignal{1}; AsyncSemaphore statisticsUpdateSignal{1};
std::optional<AsyncSemaphore::Token> statisticsUpdateInhibitor; std::optional<AsyncSemaphore::Token> statisticsUpdateInhibitor;
@ -167,8 +168,8 @@ private:
statisticsUpdateInhibitor = {}; statisticsUpdateInhibitor = {};
} }
kj::Promise<Result<void>> runImpl(Targets topGoals); kj::Promise<Result<Results>> runImpl(Targets topGoals);
kj::Promise<Result<void>> boopGC(LocalStore & localStore); kj::Promise<Result<Results>> boopGC(LocalStore & localStore);
public: public:
@ -265,7 +266,7 @@ public:
/** /**
* Loop until the specified top-level goals have finished. * Loop until the specified top-level goals have finished.
*/ */
std::vector<GoalPtr> run(std::function<Targets (GoalFactory &)> req); Results run(std::function<Targets (GoalFactory &)> req);
/*** /***
* The exit status in case of failure. * The exit status in case of failure.