forked from lix-project/lix
Fix tests on systems with a non-master git defaultBranch #1
|
@ -134,9 +134,9 @@ Goal::Finished DerivationGoal::timedOut(Error && ex)
|
|||
}
|
||||
|
||||
|
||||
kj::Promise<Result<Goal::WorkResult>> DerivationGoal::work(bool inBuildSlot) noexcept
|
||||
kj::Promise<Result<Goal::WorkResult>> DerivationGoal::work() noexcept
|
||||
{
|
||||
return (this->*state)(inBuildSlot);
|
||||
return (this->*state)(slotToken.valid());
|
||||
}
|
||||
|
||||
void DerivationGoal::addWantedOutputs(const OutputsSpec & outputs)
|
||||
|
@ -783,7 +783,7 @@ try {
|
|||
buildResult.startTime = time(0); // inexact
|
||||
state = &DerivationGoal::buildDone;
|
||||
started();
|
||||
return {{WaitForWorld{std::move(a.promise), false}}};
|
||||
return {{WaitForWorld{std::move(a.promise)}}};
|
||||
},
|
||||
[&](HookReply::Postpone) -> std::optional<kj::Promise<Result<WorkResult>>> {
|
||||
/* Not now; wait until at least one child finishes or
|
||||
|
@ -980,6 +980,7 @@ kj::Promise<Result<Goal::WorkResult>> DerivationGoal::buildDone(bool inBuildSlot
|
|||
try {
|
||||
trace("build done");
|
||||
|
||||
slotToken = {};
|
||||
Finally releaseBuildUser([&](){ this->cleanupHookFinally(); });
|
||||
|
||||
cleanupPreChildKill();
|
||||
|
|
|
@ -249,7 +249,7 @@ struct DerivationGoal : public Goal
|
|||
|
||||
std::string key() override;
|
||||
|
||||
kj::Promise<Result<WorkResult>> work(bool inBuildSlot) noexcept override;
|
||||
kj::Promise<Result<WorkResult>> work() noexcept override;
|
||||
|
||||
/**
|
||||
* Add wanted outputs to an already existing derivation goal.
|
||||
|
|
|
@ -42,7 +42,10 @@ try {
|
|||
trace("trying next substituter");
|
||||
|
||||
if (!inBuildSlot) {
|
||||
return {WaitForSlot{}};
|
||||
return worker.substitutions.acquire().then([this](auto token) {
|
||||
slotToken = std::move(token);
|
||||
return work();
|
||||
});
|
||||
}
|
||||
|
||||
maintainRunningSubstitutions = worker.runningSubstitutions.addTemporarily(1);
|
||||
|
@ -81,7 +84,7 @@ try {
|
|||
|
||||
state = &DrvOutputSubstitutionGoal::realisationFetched;
|
||||
return {WaitForWorld{
|
||||
pipe.promise.then([]() -> Outcome<void, Finished> { return result::success(); }), true
|
||||
pipe.promise.then([]() -> Outcome<void, Finished> { return result::success(); })
|
||||
}};
|
||||
} catch (...) {
|
||||
return {std::current_exception()};
|
||||
|
@ -90,6 +93,7 @@ try {
|
|||
kj::Promise<Result<Goal::WorkResult>> DrvOutputSubstitutionGoal::realisationFetched(bool inBuildSlot) noexcept
|
||||
try {
|
||||
maintainRunningSubstitutions.reset();
|
||||
slotToken = {};
|
||||
|
||||
try {
|
||||
outputInfo = downloadState->result.get();
|
||||
|
@ -168,9 +172,9 @@ std::string DrvOutputSubstitutionGoal::key()
|
|||
return "a$" + std::string(id.to_string());
|
||||
}
|
||||
|
||||
kj::Promise<Result<Goal::WorkResult>> DrvOutputSubstitutionGoal::work(bool inBuildSlot) noexcept
|
||||
kj::Promise<Result<Goal::WorkResult>> DrvOutputSubstitutionGoal::work() noexcept
|
||||
{
|
||||
return (this->*state)(inBuildSlot);
|
||||
return (this->*state)(slotToken.valid());
|
||||
}
|
||||
|
||||
|
||||
|
|
|
@ -76,7 +76,7 @@ public:
|
|||
|
||||
std::string key() override;
|
||||
|
||||
kj::Promise<Result<WorkResult>> work(bool inBuildSlot) noexcept override;
|
||||
kj::Promise<Result<WorkResult>> work() noexcept override;
|
||||
|
||||
JobCategory jobCategory() const override {
|
||||
return JobCategory::Substitution;
|
||||
|
|
|
@ -1,6 +1,7 @@
|
|||
#pragma once
|
||||
///@file
|
||||
|
||||
#include "async-semaphore.hh"
|
||||
#include "result.hh"
|
||||
#include "types.hh"
|
||||
#include "store-api.hh"
|
||||
|
@ -112,19 +113,20 @@ struct Goal
|
|||
*/
|
||||
BuildResult buildResult;
|
||||
|
||||
protected:
|
||||
AsyncSemaphore::Token slotToken;
|
||||
|
||||
public:
|
||||
|
||||
struct Finished;
|
||||
|
||||
struct [[nodiscard]] StillAlive {};
|
||||
struct [[nodiscard]] WaitForSlot {};
|
||||
struct [[nodiscard]] ContinueImmediately {};
|
||||
struct [[nodiscard]] WaitForGoals {
|
||||
Goals goals;
|
||||
};
|
||||
struct [[nodiscard]] WaitForWorld {
|
||||
kj::Promise<Outcome<void, Finished>> promise;
|
||||
bool inBuildSlot;
|
||||
};
|
||||
struct [[nodiscard]] Finished {
|
||||
ExitCode exitCode;
|
||||
|
@ -138,7 +140,6 @@ public:
|
|||
|
||||
struct [[nodiscard]] WorkResult : std::variant<
|
||||
StillAlive,
|
||||
WaitForSlot,
|
||||
ContinueImmediately,
|
||||
WaitForGoals,
|
||||
WaitForWorld,
|
||||
|
@ -168,7 +169,7 @@ public:
|
|||
trace("goal destroyed");
|
||||
}
|
||||
|
||||
virtual kj::Promise<Result<WorkResult>> work(bool inBuildSlot) noexcept = 0;
|
||||
virtual kj::Promise<Result<WorkResult>> work() noexcept = 0;
|
||||
|
||||
virtual void waiteeDone(GoalPtr waitee) { }
|
||||
|
||||
|
|
|
@ -156,8 +156,11 @@ try {
|
|||
if (!inBuildSlot) {
|
||||
state = &DerivationGoal::tryToBuild;
|
||||
outputLocks.unlock();
|
||||
if (0U != settings.maxBuildJobs) {
|
||||
return {WaitForSlot{}};
|
||||
if (worker.localBuilds.capacity() > 0) {
|
||||
return worker.localBuilds.acquire().then([this](auto token) {
|
||||
slotToken = std::move(token);
|
||||
return work();
|
||||
});
|
||||
}
|
||||
if (getMachines().empty()) {
|
||||
throw Error(
|
||||
|
@ -248,7 +251,7 @@ try {
|
|||
state = &DerivationGoal::buildDone;
|
||||
|
||||
started();
|
||||
return {WaitForWorld{std::move(promise), true}};
|
||||
return {WaitForWorld{std::move(promise)}};
|
||||
|
||||
} catch (BuildError & e) {
|
||||
outputLocks.unlock();
|
||||
|
|
|
@ -45,9 +45,9 @@ Goal::Finished PathSubstitutionGoal::done(
|
|||
}
|
||||
|
||||
|
||||
kj::Promise<Result<Goal::WorkResult>> PathSubstitutionGoal::work(bool inBuildSlot) noexcept
|
||||
kj::Promise<Result<Goal::WorkResult>> PathSubstitutionGoal::work() noexcept
|
||||
{
|
||||
return (this->*state)(inBuildSlot);
|
||||
return (this->*state)(slotToken.valid());
|
||||
}
|
||||
|
||||
|
||||
|
@ -203,7 +203,10 @@ try {
|
|||
trace("trying to run");
|
||||
|
||||
if (!inBuildSlot) {
|
||||
return {WaitForSlot{}};
|
||||
return worker.substitutions.acquire().then([this](auto token) {
|
||||
slotToken = std::move(token);
|
||||
return work();
|
||||
});
|
||||
}
|
||||
|
||||
maintainRunningSubstitutions = worker.runningSubstitutions.addTemporarily(1);
|
||||
|
@ -236,7 +239,7 @@ try {
|
|||
|
||||
state = &PathSubstitutionGoal::finished;
|
||||
return {WaitForWorld{
|
||||
pipe.promise.then([]() -> Outcome<void, Finished> { return result::success(); }), true
|
||||
pipe.promise.then([]() -> Outcome<void, Finished> { return result::success(); })
|
||||
}};
|
||||
} catch (...) {
|
||||
return {std::current_exception()};
|
||||
|
@ -248,6 +251,7 @@ try {
|
|||
trace("substitute finished");
|
||||
|
||||
try {
|
||||
slotToken = {};
|
||||
thr.get();
|
||||
} catch (std::exception & e) {
|
||||
printError(e.what());
|
||||
|
|
|
@ -99,7 +99,7 @@ public:
|
|||
return "a$" + std::string(storePath.name()) + "$" + worker.store.printStorePath(storePath);
|
||||
}
|
||||
|
||||
kj::Promise<Result<WorkResult>> work(bool inBuildSlot) noexcept override;
|
||||
kj::Promise<Result<WorkResult>> work() noexcept override;
|
||||
|
||||
/**
|
||||
* The states.
|
||||
|
|
|
@ -27,11 +27,13 @@ Worker::Worker(Store & store, Store & evalStore, kj::AsyncIoContext & aio)
|
|||
, store(store)
|
||||
, evalStore(evalStore)
|
||||
, aio(aio)
|
||||
/* Make sure that we are always allowed to run at least one substitution.
|
||||
This prevents infinite waiting. */
|
||||
, substitutions(std::max<unsigned>(1, settings.maxSubstitutionJobs))
|
||||
, localBuilds(settings.maxBuildJobs)
|
||||
, children(errorHandler)
|
||||
{
|
||||
/* Debugging: prevent recursive workers. */
|
||||
nrLocalBuilds = 0;
|
||||
nrSubstitutions = 0;
|
||||
}
|
||||
|
||||
|
||||
|
@ -210,7 +212,6 @@ void Worker::handleWorkResult(GoalPtr goal, Goal::WorkResult how)
|
|||
std::visit(
|
||||
overloaded{
|
||||
[&](Goal::StillAlive) {},
|
||||
[&](Goal::WaitForSlot) { waitForBuildSlot(goal); },
|
||||
[&](Goal::ContinueImmediately) { wakeUp(goal); },
|
||||
[&](Goal::WaitForGoals & w) {
|
||||
for (auto & dep : w.goals) {
|
||||
|
@ -219,9 +220,7 @@ void Worker::handleWorkResult(GoalPtr goal, Goal::WorkResult how)
|
|||
}
|
||||
},
|
||||
[&](Goal::WaitForWorld & w) {
|
||||
childStarted(
|
||||
goal,
|
||||
w.promise.then([](auto r) -> Result<Goal::WorkResult> {
|
||||
childStarted(goal, w.promise.then([](auto r) -> Result<Goal::WorkResult> {
|
||||
if (r.has_value()) {
|
||||
return {Goal::ContinueImmediately{}};
|
||||
} else if (r.has_error()) {
|
||||
|
@ -229,9 +228,7 @@ void Worker::handleWorkResult(GoalPtr goal, Goal::WorkResult how)
|
|||
} else {
|
||||
return r.exception();
|
||||
}
|
||||
}),
|
||||
w.inBuildSlot
|
||||
);
|
||||
}));
|
||||
},
|
||||
[&](Goal::Finished & f) { goalFinished(goal, f); },
|
||||
},
|
||||
|
@ -268,8 +265,7 @@ void Worker::wakeUp(GoalPtr goal)
|
|||
}
|
||||
|
||||
|
||||
void Worker::childStarted(GoalPtr goal, kj::Promise<Result<Goal::WorkResult>> promise,
|
||||
bool inBuildSlot)
|
||||
void Worker::childStarted(GoalPtr goal, kj::Promise<Result<Goal::WorkResult>> promise)
|
||||
{
|
||||
children.add(promise
|
||||
.then([this, goal](auto result) {
|
||||
|
@ -279,64 +275,17 @@ void Worker::childStarted(GoalPtr goal, kj::Promise<Result<Goal::WorkResult>> pr
|
|||
childException = result.assume_error();
|
||||
}
|
||||
})
|
||||
.attach(Finally{[this, goal, inBuildSlot] {
|
||||
childTerminated(goal, inBuildSlot);
|
||||
.attach(Finally{[this, goal] {
|
||||
childTerminated(goal);
|
||||
}}));
|
||||
if (inBuildSlot) {
|
||||
switch (goal->jobCategory()) {
|
||||
case JobCategory::Substitution:
|
||||
nrSubstitutions++;
|
||||
break;
|
||||
case JobCategory::Build:
|
||||
nrLocalBuilds++;
|
||||
break;
|
||||
default:
|
||||
abort();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
void Worker::childTerminated(GoalPtr goal, bool inBuildSlot)
|
||||
void Worker::childTerminated(GoalPtr goal)
|
||||
{
|
||||
if (childFinished) {
|
||||
childFinished->fulfill();
|
||||
}
|
||||
|
||||
if (inBuildSlot) {
|
||||
switch (goal->jobCategory()) {
|
||||
case JobCategory::Substitution:
|
||||
assert(nrSubstitutions > 0);
|
||||
nrSubstitutions--;
|
||||
break;
|
||||
case JobCategory::Build:
|
||||
assert(nrLocalBuilds > 0);
|
||||
nrLocalBuilds--;
|
||||
break;
|
||||
default:
|
||||
abort();
|
||||
}
|
||||
}
|
||||
|
||||
/* Wake up goals waiting for a build slot. */
|
||||
for (auto & j : wantingToBuild) {
|
||||
GoalPtr goal = j.lock();
|
||||
if (goal) wakeUp(goal);
|
||||
}
|
||||
|
||||
wantingToBuild.clear();
|
||||
}
|
||||
|
||||
|
||||
void Worker::waitForBuildSlot(GoalPtr goal)
|
||||
{
|
||||
goal->trace("wait for build slot");
|
||||
bool isSubstitutionGoal = goal->jobCategory() == JobCategory::Substitution;
|
||||
if ((!isSubstitutionGoal && nrLocalBuilds < settings.maxBuildJobs) ||
|
||||
(isSubstitutionGoal && nrSubstitutions < settings.maxSubstitutionJobs))
|
||||
wakeUp(goal); /* we can do it right away */
|
||||
else
|
||||
wantingToBuild.insert(goal);
|
||||
}
|
||||
|
||||
|
||||
|
@ -394,16 +343,11 @@ Goals Worker::run(std::function<Goals (GoalFactory &)> req)
|
|||
awake.clear();
|
||||
for (auto & goal : awake2) {
|
||||
checkInterrupt();
|
||||
/* Make sure that we are always allowed to run at least one substitution.
|
||||
This prevents infinite waiting. */
|
||||
const bool inSlot = goal->jobCategory() == JobCategory::Substitution
|
||||
? nrSubstitutions < std::max(1U, (unsigned int) settings.maxSubstitutionJobs)
|
||||
: nrLocalBuilds < settings.maxBuildJobs;
|
||||
auto result = goal->work(inSlot);
|
||||
auto result = goal->work();
|
||||
if (result.poll(aio.waitScope)) {
|
||||
handleWorkResult(goal, result.wait(aio.waitScope).value());
|
||||
} else {
|
||||
childStarted(goal, std::move(result), false);
|
||||
childStarted(goal, std::move(result));
|
||||
}
|
||||
|
||||
if (topGoals.empty()) break; // stuff may have been cancelled
|
||||
|
@ -428,7 +372,6 @@ Goals Worker::run(std::function<Goals (GoalFactory &)> req)
|
|||
exited while some of its subgoals were still active. But if
|
||||
--keep-going *is* set, then they must all be finished now. */
|
||||
assert(!settings.keepGoing || awake.empty());
|
||||
assert(!settings.keepGoing || wantingToBuild.empty());
|
||||
assert(!settings.keepGoing || children.isEmpty());
|
||||
|
||||
return _topGoals;
|
||||
|
|
|
@ -1,6 +1,7 @@
|
|||
#pragma once
|
||||
///@file
|
||||
|
||||
#include "async-semaphore.hh"
|
||||
#include "notifying-counter.hh"
|
||||
#include "types.hh"
|
||||
#include "lock.hh"
|
||||
|
@ -93,22 +94,6 @@ private:
|
|||
*/
|
||||
WeakGoals awake;
|
||||
|
||||
/**
|
||||
* Goals waiting for a build slot.
|
||||
*/
|
||||
WeakGoals wantingToBuild;
|
||||
|
||||
/**
|
||||
* Number of build slots occupied. This includes local builds but does not
|
||||
* include substitutions or remote builds via the build hook.
|
||||
*/
|
||||
unsigned int nrLocalBuilds;
|
||||
|
||||
/**
|
||||
* Number of substitution slots occupied.
|
||||
*/
|
||||
unsigned int nrSubstitutions;
|
||||
|
||||
/**
|
||||
* Maps used to prevent multiple instantiations of a goal for the
|
||||
* same derivation / path.
|
||||
|
@ -148,12 +133,6 @@ private:
|
|||
|
||||
kj::Own<kj::PromiseFulfiller<void>> childFinished;
|
||||
|
||||
/**
|
||||
* Put `goal` to sleep until a build slot becomes available (which
|
||||
* might be right away).
|
||||
*/
|
||||
void waitForBuildSlot(GoalPtr goal);
|
||||
|
||||
/**
|
||||
* Wake up a goal (i.e., there is something for it to do).
|
||||
*/
|
||||
|
@ -170,16 +149,14 @@ private:
|
|||
void removeGoal(GoalPtr goal);
|
||||
|
||||
/**
|
||||
* Registers a running child process. `inBuildSlot` means that
|
||||
* the process counts towards the jobs limit.
|
||||
* Registers a running child process.
|
||||
*/
|
||||
void childStarted(GoalPtr goal, kj::Promise<Result<Goal::WorkResult>> promise,
|
||||
bool inBuildSlot);
|
||||
void childStarted(GoalPtr goal, kj::Promise<Result<Goal::WorkResult>> promise);
|
||||
|
||||
/**
|
||||
* Unregisters a running child process.
|
||||
*/
|
||||
void childTerminated(GoalPtr goal, bool inBuildSlot);
|
||||
void childTerminated(GoalPtr goal);
|
||||
|
||||
/**
|
||||
* Pass current stats counters to the logger for progress bar updates.
|
||||
|
@ -205,6 +182,7 @@ public:
|
|||
Store & store;
|
||||
Store & evalStore;
|
||||
kj::AsyncIoContext & aio;
|
||||
AsyncSemaphore substitutions, localBuilds;
|
||||
|
||||
private:
|
||||
kj::TaskSet children;
|
||||
|
|
Loading…
Reference in a new issue