forked from lix-project/lix
libstore: remove Goal::StillAlive
this was a triumph. i'm making a note here: huge success. it's hard to
overstate my satisfaction! i'm not even angry. i'm being so sincere ri
actually, no. we *are* angry. this was one dumbass odyssey. nobody has
asked for this. but not doing it would have locked us into old, broken
protocols forever or (possibly worse) forced us to write our own async
framework building on the old did-you-mean-continuations in Worker. if
we had done that we'd be locked into ever more, and ever more complex,
manual state management all over the place. this just could not stand.
Change-Id: I43a6de1035febff59d2eff83be9ad52af4659871
This commit is contained in:
parent
86b213e632
commit
896a123605
11 changed files with 46 additions and 76 deletions
|
@ -118,7 +118,7 @@ void DerivationGoal::killChild()
|
|||
}
|
||||
|
||||
|
||||
Goal::Finished DerivationGoal::timedOut(Error && ex)
|
||||
Goal::WorkResult DerivationGoal::timedOut(Error && ex)
|
||||
{
|
||||
killChild();
|
||||
return done(BuildResult::TimedOut, {}, std::move(ex));
|
||||
|
@ -728,7 +728,7 @@ retry:
|
|||
if (!actLock)
|
||||
actLock = std::make_unique<Activity>(*logger, lvlWarn, actBuildWaiting,
|
||||
fmt("waiting for lock on %s", Magenta(showPaths(lockFiles))));
|
||||
(co_await waitForAWhile()).value();
|
||||
co_await waitForAWhile();
|
||||
// we can loop very often, and `co_return co_await` always allocates a new frame
|
||||
goto retry;
|
||||
}
|
||||
|
@ -799,7 +799,7 @@ retry:
|
|||
actLock = std::make_unique<Activity>(*logger, lvlTalkative, actBuildWaiting,
|
||||
fmt("waiting for a machine to build '%s'", Magenta(worker.store.printStorePath(drvPath))));
|
||||
outputLocks.unlock();
|
||||
(co_await waitForAWhile()).value();
|
||||
co_await waitForAWhile();
|
||||
goto retry;
|
||||
}
|
||||
|
||||
|
@ -1331,7 +1331,7 @@ void DerivationGoal::closeLogFile()
|
|||
}
|
||||
|
||||
|
||||
Goal::Finished DerivationGoal::tooMuchLogs()
|
||||
Goal::WorkResult DerivationGoal::tooMuchLogs()
|
||||
{
|
||||
killChild();
|
||||
return done(
|
||||
|
@ -1380,7 +1380,7 @@ struct DerivationGoal::InputStream final : private kj::AsyncObject
|
|||
}
|
||||
};
|
||||
|
||||
kj::Promise<Outcome<void, Goal::Finished>> DerivationGoal::handleBuilderOutput(InputStream & in) noexcept
|
||||
kj::Promise<Outcome<void, Goal::WorkResult>> DerivationGoal::handleBuilderOutput(InputStream & in) noexcept
|
||||
try {
|
||||
auto buf = kj::heapArray<char>(4096);
|
||||
while (true) {
|
||||
|
@ -1413,7 +1413,7 @@ try {
|
|||
co_return std::current_exception();
|
||||
}
|
||||
|
||||
kj::Promise<Outcome<void, Goal::Finished>> DerivationGoal::handleHookOutput(InputStream & in) noexcept
|
||||
kj::Promise<Outcome<void, Goal::WorkResult>> DerivationGoal::handleHookOutput(InputStream & in) noexcept
|
||||
try {
|
||||
auto buf = kj::heapArray<char>(4096);
|
||||
while (true) {
|
||||
|
@ -1467,7 +1467,7 @@ try {
|
|||
co_return std::current_exception();
|
||||
}
|
||||
|
||||
kj::Promise<Outcome<void, Goal::Finished>> DerivationGoal::handleChildOutput() noexcept
|
||||
kj::Promise<Outcome<void, Goal::WorkResult>> DerivationGoal::handleChildOutput() noexcept
|
||||
try {
|
||||
assert(builderOutFD);
|
||||
|
||||
|
@ -1483,7 +1483,7 @@ try {
|
|||
handlers = handlers.exclusiveJoin(
|
||||
worker.aio.provider->getTimer()
|
||||
.afterDelay(settings.buildTimeout.get() * kj::SECONDS)
|
||||
.then([this]() -> Outcome<void, Finished> {
|
||||
.then([this]() -> Outcome<void, WorkResult> {
|
||||
return timedOut(
|
||||
Error("%1% timed out after %2% seconds", name, settings.buildTimeout)
|
||||
);
|
||||
|
@ -1491,7 +1491,7 @@ try {
|
|||
);
|
||||
}
|
||||
|
||||
return handlers.then([this](auto r) -> Outcome<void, Finished> {
|
||||
return handlers.then([this](auto r) -> Outcome<void, WorkResult> {
|
||||
if (!currentLogLine.empty()) flushLine();
|
||||
return r;
|
||||
});
|
||||
|
@ -1499,7 +1499,7 @@ try {
|
|||
return {std::current_exception()};
|
||||
}
|
||||
|
||||
kj::Promise<Outcome<void, Goal::Finished>> DerivationGoal::monitorForSilence() noexcept
|
||||
kj::Promise<Outcome<void, Goal::WorkResult>> DerivationGoal::monitorForSilence() noexcept
|
||||
{
|
||||
while (true) {
|
||||
const auto stash = lastChildActivity;
|
||||
|
@ -1513,13 +1513,13 @@ kj::Promise<Outcome<void, Goal::Finished>> DerivationGoal::monitorForSilence() n
|
|||
}
|
||||
}
|
||||
|
||||
kj::Promise<Outcome<void, Goal::Finished>>
|
||||
kj::Promise<Outcome<void, Goal::WorkResult>>
|
||||
DerivationGoal::handleChildStreams(InputStream & builderIn, InputStream * hookIn) noexcept
|
||||
{
|
||||
lastChildActivity = worker.aio.provider->getTimer().now();
|
||||
|
||||
auto handlers = kj::joinPromisesFailFast([&] {
|
||||
kj::Vector<kj::Promise<Outcome<void, Finished>>> parts{2};
|
||||
kj::Vector<kj::Promise<Outcome<void, WorkResult>>> parts{2};
|
||||
|
||||
parts.add(handleBuilderOutput(builderIn));
|
||||
if (hookIn) {
|
||||
|
@ -1680,7 +1680,7 @@ SingleDrvOutputs DerivationGoal::assertPathValidity()
|
|||
}
|
||||
|
||||
|
||||
Goal::Finished DerivationGoal::done(
|
||||
Goal::WorkResult DerivationGoal::done(
|
||||
BuildResult::Status status,
|
||||
SingleDrvOutputs builtOutputs,
|
||||
std::optional<Error> ex)
|
||||
|
@ -1717,7 +1717,7 @@ Goal::Finished DerivationGoal::done(
|
|||
logError(ex->info());
|
||||
}
|
||||
|
||||
return Finished{
|
||||
return WorkResult{
|
||||
.exitCode = buildResult.success() ? ecSuccess : ecFailed,
|
||||
.result = buildResult,
|
||||
.ex = ex ? std::make_shared<Error>(std::move(*ex)) : nullptr,
|
||||
|
|
|
@ -18,7 +18,7 @@ struct HookInstance;
|
|||
|
||||
struct HookReplyBase {
|
||||
struct [[nodiscard]] Accept {
|
||||
kj::Promise<Outcome<void, Goal::Finished>> promise;
|
||||
kj::Promise<Outcome<void, Goal::WorkResult>> promise;
|
||||
};
|
||||
struct [[nodiscard]] Decline {};
|
||||
struct [[nodiscard]] Postpone {};
|
||||
|
@ -248,7 +248,7 @@ struct DerivationGoal : public Goal
|
|||
BuildMode buildMode = bmNormal);
|
||||
virtual ~DerivationGoal() noexcept(false);
|
||||
|
||||
Finished timedOut(Error && ex);
|
||||
WorkResult timedOut(Error && ex);
|
||||
|
||||
kj::Promise<Result<WorkResult>> work() noexcept override;
|
||||
|
||||
|
@ -319,13 +319,13 @@ struct DerivationGoal : public Goal
|
|||
protected:
|
||||
kj::TimePoint lastChildActivity = kj::minValue;
|
||||
|
||||
kj::Promise<Outcome<void, Finished>> handleChildOutput() noexcept;
|
||||
kj::Promise<Outcome<void, Finished>>
|
||||
kj::Promise<Outcome<void, WorkResult>> handleChildOutput() noexcept;
|
||||
kj::Promise<Outcome<void, WorkResult>>
|
||||
handleChildStreams(InputStream & builderIn, InputStream * hookIn) noexcept;
|
||||
kj::Promise<Outcome<void, Finished>> handleBuilderOutput(InputStream & in) noexcept;
|
||||
kj::Promise<Outcome<void, Finished>> handleHookOutput(InputStream & in) noexcept;
|
||||
kj::Promise<Outcome<void, Finished>> monitorForSilence() noexcept;
|
||||
Finished tooMuchLogs();
|
||||
kj::Promise<Outcome<void, WorkResult>> handleBuilderOutput(InputStream & in) noexcept;
|
||||
kj::Promise<Outcome<void, WorkResult>> handleHookOutput(InputStream & in) noexcept;
|
||||
kj::Promise<Outcome<void, WorkResult>> monitorForSilence() noexcept;
|
||||
WorkResult tooMuchLogs();
|
||||
void flushLine();
|
||||
|
||||
public:
|
||||
|
@ -360,7 +360,7 @@ public:
|
|||
|
||||
void started();
|
||||
|
||||
Finished done(
|
||||
WorkResult done(
|
||||
BuildResult::Status status,
|
||||
SingleDrvOutputs builtOutputs = {},
|
||||
std::optional<Error> ex = {});
|
||||
|
|
|
@ -30,7 +30,7 @@ try {
|
|||
|
||||
/* If the derivation already exists, we’re done */
|
||||
if (worker.store.queryRealisation(id)) {
|
||||
co_return Finished{ecSuccess, std::move(buildResult)};
|
||||
co_return WorkResult{ecSuccess, std::move(buildResult)};
|
||||
}
|
||||
|
||||
subs = settings.useSubstitutes ? getDefaultSubstituters() : std::list<ref<Store>>();
|
||||
|
@ -61,7 +61,7 @@ try {
|
|||
/* Hack: don't indicate failure if there were no substituters.
|
||||
In that case the calling derivation should just do a
|
||||
build. */
|
||||
co_return Finished{substituterFailed ? ecFailed : ecNoSubstituters, std::move(buildResult)};
|
||||
co_return WorkResult{substituterFailed ? ecFailed : ecNoSubstituters, std::move(buildResult)};
|
||||
}
|
||||
|
||||
sub = subs.front();
|
||||
|
@ -140,7 +140,7 @@ try {
|
|||
|
||||
if (nrFailed > 0) {
|
||||
debug("The output path of the derivation output '%s' could not be substituted", id.to_string());
|
||||
return {Finished{
|
||||
return {WorkResult{
|
||||
nrNoSubstituters > 0 || nrIncompleteClosure > 0 ? ecIncompleteClosure : ecFailed,
|
||||
std::move(buildResult),
|
||||
}};
|
||||
|
@ -155,7 +155,7 @@ try {
|
|||
kj::Promise<Result<Goal::WorkResult>> DrvOutputSubstitutionGoal::finished() noexcept
|
||||
try {
|
||||
trace("finished");
|
||||
return {Finished{ecSuccess, std::move(buildResult)}};
|
||||
return {WorkResult{ecSuccess, std::move(buildResult)}};
|
||||
} catch (...) {
|
||||
return {std::current_exception()};
|
||||
}
|
||||
|
|
|
@ -11,18 +11,15 @@ void Goal::trace(std::string_view s)
|
|||
debug("%1%: %2%", name, s);
|
||||
}
|
||||
|
||||
kj::Promise<Result<Goal::WorkResult>> Goal::waitForAWhile()
|
||||
try {
|
||||
kj::Promise<void> Goal::waitForAWhile()
|
||||
{
|
||||
trace("wait for a while");
|
||||
/* If we are polling goals that are waiting for a lock, then wake
|
||||
up after a few seconds at most. */
|
||||
co_await worker.aio.provider->getTimer().afterDelay(settings.pollInterval.get() * kj::SECONDS);
|
||||
co_return StillAlive{};
|
||||
} catch (...) {
|
||||
co_return std::current_exception();
|
||||
return worker.aio.provider->getTimer().afterDelay(settings.pollInterval.get() * kj::SECONDS);
|
||||
}
|
||||
|
||||
kj::Promise<Result<Goal::WorkResult>>
|
||||
kj::Promise<Result<void>>
|
||||
Goal::waitForGoals(kj::Array<std::pair<GoalPtr, kj::Promise<void>>> dependencies) noexcept
|
||||
try {
|
||||
auto left = dependencies.size();
|
||||
|
@ -45,11 +42,11 @@ try {
|
|||
waiteeDone(dep);
|
||||
|
||||
if (dep->exitCode == ecFailed && !settings.keepGoing) {
|
||||
co_return result::success(StillAlive{});
|
||||
co_return result::success();
|
||||
}
|
||||
}
|
||||
|
||||
co_return result::success(StillAlive{});
|
||||
co_return result::success();
|
||||
} catch (...) {
|
||||
co_return result::failure(std::current_exception());
|
||||
}
|
||||
|
|
|
@ -99,11 +99,7 @@ protected:
|
|||
AsyncSemaphore::Token slotToken;
|
||||
|
||||
public:
|
||||
|
||||
struct Finished;
|
||||
|
||||
struct [[nodiscard]] StillAlive {};
|
||||
struct [[nodiscard]] Finished {
|
||||
struct [[nodiscard]] WorkResult {
|
||||
ExitCode exitCode;
|
||||
BuildResult result;
|
||||
std::shared_ptr<Error> ex;
|
||||
|
@ -113,21 +109,13 @@ public:
|
|||
bool checkMismatch = false;
|
||||
};
|
||||
|
||||
struct [[nodiscard]] WorkResult : std::variant<
|
||||
StillAlive,
|
||||
Finished>
|
||||
{
|
||||
WorkResult() = delete;
|
||||
using variant::variant;
|
||||
};
|
||||
|
||||
protected:
|
||||
kj::Promise<Result<WorkResult>> waitForAWhile();
|
||||
kj::Promise<Result<WorkResult>>
|
||||
kj::Promise<void> waitForAWhile();
|
||||
kj::Promise<Result<void>>
|
||||
waitForGoals(kj::Array<std::pair<GoalPtr, kj::Promise<void>>> dependencies) noexcept;
|
||||
|
||||
template<std::derived_from<Goal>... G>
|
||||
kj::Promise<Result<Goal::WorkResult>>
|
||||
kj::Promise<Result<void>>
|
||||
waitForGoals(std::pair<std::shared_ptr<G>, kj::Promise<void>>... goals) noexcept
|
||||
{
|
||||
return waitForGoals(kj::arrOf<std::pair<GoalPtr, kj::Promise<void>>>(std::move(goals)...));
|
||||
|
|
|
@ -214,7 +214,7 @@ retry:
|
|||
if (!actLock)
|
||||
actLock = std::make_unique<Activity>(*logger, lvlWarn, actBuildWaiting,
|
||||
fmt("waiting for a free build user ID for '%s'", Magenta(worker.store.printStorePath(drvPath))));
|
||||
(co_await waitForAWhile()).value();
|
||||
co_await waitForAWhile();
|
||||
// we can loop very often, and `co_return co_await` always allocates a new frame
|
||||
goto retry;
|
||||
}
|
||||
|
@ -399,7 +399,7 @@ void LocalDerivationGoal::cleanupPostOutputsRegisteredModeNonCheck()
|
|||
|
||||
// NOTE this one isn't noexcept because it's called from places that expect
|
||||
// exceptions to signal failure to launch. we should change this some time.
|
||||
kj::Promise<Outcome<void, Goal::Finished>> LocalDerivationGoal::startBuilder()
|
||||
kj::Promise<Outcome<void, Goal::WorkResult>> LocalDerivationGoal::startBuilder()
|
||||
{
|
||||
if ((buildUser && buildUser->getUIDCount() != 1)
|
||||
#if __linux__
|
||||
|
|
|
@ -218,7 +218,7 @@ struct LocalDerivationGoal : public DerivationGoal
|
|||
/**
|
||||
* Start building a derivation.
|
||||
*/
|
||||
kj::Promise<Outcome<void, Finished>> startBuilder();
|
||||
kj::Promise<Outcome<void, WorkResult>> startBuilder();
|
||||
|
||||
/**
|
||||
* Fill in the environment for the builder.
|
||||
|
|
|
@ -32,7 +32,7 @@ PathSubstitutionGoal::~PathSubstitutionGoal()
|
|||
}
|
||||
|
||||
|
||||
Goal::Finished PathSubstitutionGoal::done(
|
||||
Goal::WorkResult PathSubstitutionGoal::done(
|
||||
ExitCode result,
|
||||
BuildResult::Status status,
|
||||
std::optional<std::string> errorMsg)
|
||||
|
@ -42,7 +42,7 @@ Goal::Finished PathSubstitutionGoal::done(
|
|||
debug(*errorMsg);
|
||||
buildResult.errorMsg = *errorMsg;
|
||||
}
|
||||
return Finished{result, std::move(buildResult)};
|
||||
return WorkResult{result, std::move(buildResult)};
|
||||
}
|
||||
|
||||
|
||||
|
|
|
@ -72,7 +72,7 @@ struct PathSubstitutionGoal : public Goal
|
|||
*/
|
||||
std::optional<ContentAddress> ca;
|
||||
|
||||
Finished done(
|
||||
WorkResult done(
|
||||
ExitCode result,
|
||||
BuildResult::Status status,
|
||||
std::optional<std::string> errorMsg = {});
|
||||
|
|
|
@ -193,7 +193,7 @@ static void removeGoal(std::shared_ptr<G> goal, auto & goalMap)
|
|||
}
|
||||
|
||||
|
||||
void Worker::goalFinished(GoalPtr goal, Goal::Finished & f)
|
||||
void Worker::goalFinished(GoalPtr goal, Goal::WorkResult & f)
|
||||
{
|
||||
goal->trace("done");
|
||||
assert(!goal->exitCode.has_value());
|
||||
|
@ -210,20 +210,6 @@ void Worker::goalFinished(GoalPtr goal, Goal::Finished & f)
|
|||
goal->cleanup();
|
||||
}
|
||||
|
||||
void Worker::handleWorkResult(GoalPtr goal, Goal::WorkResult how)
|
||||
{
|
||||
std::visit(
|
||||
overloaded{
|
||||
[&](Goal::StillAlive) {
|
||||
childStarted(goal, kj::evalLater([goal] { return goal->work(); }));
|
||||
},
|
||||
[&](Goal::Finished & f) { goalFinished(goal, f); },
|
||||
},
|
||||
how
|
||||
);
|
||||
updateStatistics();
|
||||
}
|
||||
|
||||
void Worker::removeGoal(GoalPtr goal)
|
||||
{
|
||||
if (auto drvGoal = std::dynamic_pointer_cast<DerivationGoal>(goal))
|
||||
|
@ -250,7 +236,7 @@ void Worker::childStarted(GoalPtr goal, kj::Promise<Result<Goal::WorkResult>> pr
|
|||
children.add(promise
|
||||
.then([this, goal](auto result) {
|
||||
if (result.has_value()) {
|
||||
handleWorkResult(goal, std::move(result.assume_value()));
|
||||
goalFinished(goal, result.assume_value());
|
||||
} else {
|
||||
childException = result.assume_error();
|
||||
}
|
||||
|
|
|
@ -139,8 +139,7 @@ private:
|
|||
*/
|
||||
bool checkMismatch = false;
|
||||
|
||||
void goalFinished(GoalPtr goal, Goal::Finished & f);
|
||||
void handleWorkResult(GoalPtr goal, Goal::WorkResult how);
|
||||
void goalFinished(GoalPtr goal, Goal::WorkResult & f);
|
||||
|
||||
kj::Own<kj::PromiseFulfiller<void>> childFinished;
|
||||
|
||||
|
|
Loading…
Reference in a new issue