Kill active build steps when builds are cancelled

We now kill active build steps when there are no more referring
builds. This is useful e.g. for preventing cancelled multi-hour TPC-H
benchmark runs from hogging build machines.
This commit is contained in:
Eelco Dolstra 2016-10-31 14:58:29 +01:00
parent a816ef873d
commit b3169ce438
6 changed files with 111 additions and 47 deletions

View file

@ -13,7 +13,14 @@ void State::builder(MachineReservation::ptr reservation)
nrStepsStarted++;
MaintainCount mc(nrActiveSteps);
reservation->threadId = pthread_self();
activeSteps_.lock()->insert(reservation);
Finally removeActiveStep([&]() {
reservation->threadId = -1;
activeSteps_.lock()->erase(reservation);
});
auto step = reservation->step;
@ -63,8 +70,15 @@ State::StepResult State::doBuildStep(nix::ref<Store> destStore, Step::ptr step,
purpose of creating build steps. We could create a build step
record for every build, but that could be very expensive
(e.g. a stdenv derivation can be a dependency of tens of
thousands of builds), so we don't. */
Build::ptr build;
thousands of builds), so we don't.
We don't keep a Build::ptr here to allow
State::processQueueChange() to detect whether a step can be
cancelled (namely if there are no more Builds referring to
it). */
BuildID buildId;
Path buildDrvPath;
unsigned int maxSilentTime, buildTimeout;
{
std::set<Build::ptr> dependents;
@ -85,6 +99,8 @@ State::StepResult State::doBuildStep(nix::ref<Store> destStore, Step::ptr step,
return sMaybeCancelled;
}
Build::ptr build;
for (auto build2 : dependents) {
if (build2->drvPath == step->drvPath) {
build = build2;
@ -93,11 +109,16 @@ State::StepResult State::doBuildStep(nix::ref<Store> destStore, Step::ptr step,
}
if (!build) build = *dependents.begin();
buildId = build->id;
buildDrvPath = build->drvPath;
maxSilentTime = build->maxSilentTime;
buildTimeout = build->buildTimeout;
printMsg(lvlInfo, format("performing step %1% on %2% (needed by build %3% and %4% others)")
% step->drvPath % machine->sshName % build->id % (dependents.size() - 1));
% step->drvPath % machine->sshName % buildId % (dependents.size() - 1));
}
bool quit = build->id == buildOne && step->drvPath == build->drvPath;
bool quit = buildId == buildOne && step->drvPath == buildDrvPath;
auto conn(dbPool.get());
@ -108,9 +129,9 @@ State::StepResult State::doBuildStep(nix::ref<Store> destStore, Step::ptr step,
Finally clearStep([&]() {
if (stepNr && !stepFinished) {
printError("marking step %d of build %d as orphaned", stepNr, build->id);
printError("marking step %d of build %d as orphaned", stepNr, buildId);
auto orphanedSteps_(orphanedSteps.lock());
orphanedSteps_->emplace(build->id, stepNr);
orphanedSteps_->emplace(buildId, stepNr);
}
});
@ -127,20 +148,33 @@ State::StepResult State::doBuildStep(nix::ref<Store> destStore, Step::ptr step,
{
auto mc = startDbUpdate();
pqxx::work txn(*conn);
stepNr = createBuildStep(txn, result.startTime, build, step, machine->sshName, bsBusy);
stepNr = createBuildStep(txn, result.startTime, buildId, step, machine->sshName, bsBusy);
txn.commit();
}
/* Do the build. */
try {
/* FIXME: referring builds may have conflicting timeouts. */
buildRemote(destStore, machine, step, build->maxSilentTime, build->buildTimeout, result);
buildRemote(destStore, machine, step, maxSilentTime, buildTimeout, result);
} catch (NoTokens & e) {
result.stepStatus = bsNarSizeLimitExceeded;
} catch (Error & e) {
result.stepStatus = bsAborted;
result.errorMsg = e.msg();
result.canRetry = true;
} catch (__cxxabiv1::__forced_unwind & e) {
/* The queue monitor thread cancelled this step. */
try {
printInfo("marking step %d of build %d as succeeded", stepNr, buildId);
pqxx::work txn(*conn);
finishBuildStep(txn, result.startTime, time(0), result.overhead, buildId,
stepNr, machine->sshName, bsCancelled, "");
txn.commit();
stepFinished = true;
} catch (...) {
ignoreException();
}
throw;
}
if (result.stepStatus == bsSuccess)
@ -167,7 +201,7 @@ State::StepResult State::doBuildStep(nix::ref<Store> destStore, Step::ptr step,
{
auto logCompressorQueue_(logCompressorQueue.lock());
assert(stepNr);
logCompressorQueue_->push({build->id, stepNr, result.logFile});
logCompressorQueue_->push({buildId, stepNr, result.logFile});
}
logCompressorWakeup.notify_one();
}
@ -187,7 +221,7 @@ State::StepResult State::doBuildStep(nix::ref<Store> destStore, Step::ptr step,
auto mc = startDbUpdate();
{
pqxx::work txn(*conn);
finishBuildStep(txn, result.startTime, result.stopTime, result.overhead, build->id,
finishBuildStep(txn, result.startTime, result.stopTime, result.overhead, buildId,
stepNr, machine->sshName, result.stepStatus, result.errorMsg);
txn.commit();
}
@ -243,11 +277,11 @@ State::StepResult State::doBuildStep(nix::ref<Store> destStore, Step::ptr step,
pqxx::work txn(*conn);
finishBuildStep(txn, result.startTime, result.stopTime, result.overhead,
build->id, stepNr, machine->sshName, bsSuccess);
buildId, stepNr, machine->sshName, bsSuccess);
for (auto & b : direct) {
printMsg(lvlInfo, format("marking build %1% as succeeded") % b->id);
markSucceededBuild(txn, b, res, build != b || result.isCached,
markSucceededBuild(txn, b, res, buildId != b->id || result.isCached,
result.startTime, result.stopTime);
}
@ -340,17 +374,17 @@ State::StepResult State::doBuildStep(nix::ref<Store> destStore, Step::ptr step,
redundant with the build's isCachedBuild field). */
for (auto & build2 : indirect) {
if ((result.stepStatus == bsCachedFailure && build2->drvPath == step->drvPath) ||
(result.stepStatus != bsCachedFailure && build == build2) ||
(result.stepStatus != bsCachedFailure && buildId == build2->id) ||
build2->finishedInDB)
continue;
createBuildStep(txn, 0, build2, step, machine->sshName,
result.stepStatus, result.errorMsg, build == build2 ? 0 : build->id);
createBuildStep(txn, 0, build2->id, step, machine->sshName,
result.stepStatus, result.errorMsg, buildId == build2->id ? 0 : buildId);
}
if (result.stepStatus != bsCachedFailure && !stepFinished) {
assert(stepNr);
finishBuildStep(txn, result.startTime, result.stopTime, result.overhead,
build->id, stepNr, machine->sshName, result.stepStatus, result.errorMsg);
buildId, stepNr, machine->sshName, result.stepStatus, result.errorMsg);
}
/* Mark all builds that depend on this derivation as failed. */
@ -392,7 +426,7 @@ State::StepResult State::doBuildStep(nix::ref<Store> destStore, Step::ptr step,
/* Send notification about this build and its dependents. */
{
auto notificationSenderQueue_(notificationSenderQueue.lock());
notificationSenderQueue_->push(NotificationItem{NotificationItem::Type::BuildFinished, build->id, dependentIDs});
notificationSenderQueue_->push(NotificationItem{NotificationItem::Type::BuildFinished, buildId, dependentIDs});
}
notificationSenderWakeup.notify_one();

View file

@ -224,25 +224,25 @@ void State::clearBusy(Connection & conn, time_t stopTime)
}
unsigned int State::allocBuildStep(pqxx::work & txn, Build::ptr build)
unsigned int State::allocBuildStep(pqxx::work & txn, BuildID buildId)
{
/* Acquire an exclusive lock on BuildSteps to ensure that we don't
race with other threads creating a step of the same build. */
txn.exec("lock table BuildSteps in exclusive mode");
auto res = txn.parameterized("select max(stepnr) from BuildSteps where build = $1")(build->id).exec();
auto res = txn.parameterized("select max(stepnr) from BuildSteps where build = $1")(buildId).exec();
return res[0][0].is_null() ? 1 : res[0][0].as<int>() + 1;
}
unsigned int State::createBuildStep(pqxx::work & txn, time_t startTime, Build::ptr build, Step::ptr step,
unsigned int State::createBuildStep(pqxx::work & txn, time_t startTime, BuildID buildId, Step::ptr step,
const std::string & machine, BuildStatus status, const std::string & errorMsg, BuildID propagatedFrom)
{
unsigned int stepNr = allocBuildStep(txn, build);
auto stepNr = allocBuildStep(txn, buildId);
txn.parameterized
("insert into BuildSteps (build, stepnr, type, drvPath, busy, startTime, system, status, propagatedFrom, errorMsg, stopTime, machine) values ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12)")
(build->id)
(buildId)
(stepNr)
(0) // == build
(step->drvPath)
@ -258,7 +258,7 @@ unsigned int State::createBuildStep(pqxx::work & txn, time_t startTime, Build::p
for (auto & output : step->drv.outputs)
txn.parameterized
("insert into BuildStepOutputs (build, stepnr, name, path) values ($1, $2, $3, $4)")
(build->id)(stepNr)(output.first)(output.second.path).exec();
(buildId)(stepNr)(output.first)(output.second.path).exec();
return stepNr;
}
@ -284,7 +284,7 @@ void State::finishBuildStep(pqxx::work & txn, time_t startTime, time_t stopTime,
int State::createSubstitutionStep(pqxx::work & txn, time_t startTime, time_t stopTime,
Build::ptr build, const Path & drvPath, const string & outputName, const Path & storePath)
{
int stepNr = allocBuildStep(txn, build);
auto stepNr = allocBuildStep(txn, build->id);
txn.parameterized
("insert into BuildSteps (build, stepnr, type, drvPath, busy, status, startTime, stopTime) values ($1, $2, $3, $4, $5, $6, $7, $8)")
@ -574,7 +574,7 @@ void State::dumpStatus(Connection & conn, bool log)
if (i->lock()) ++i; else i = runnable_->erase(i);
root.attr("nrRunnableSteps", runnable_->size());
}
root.attr("nrActiveSteps", nrActiveSteps);
root.attr("nrActiveSteps", activeSteps_.lock()->size());
root.attr("nrStepsBuilding", nrStepsBuilding);
root.attr("nrStepsCopyingTo", nrStepsCopyingTo);
root.attr("nrStepsCopyingFrom", nrStepsCopyingFrom);

View file

@ -2,6 +2,8 @@
#include "build-result.hh"
#include "globals.hh"
#include <cstring>
using namespace nix;
@ -180,7 +182,7 @@ bool State::getQueuedBuilds(Connection & conn,
}
}
createBuildStep(txn, 0, build, ex.step, "", bsCachedFailure, "", propagatedFrom);
createBuildStep(txn, 0, build->id, ex.step, "", bsCachedFailure, "", propagatedFrom);
txn.parameterized
("update Builds set finished = 1, buildStatus = $2, startTime = $3, stopTime = $3, isCachedBuild = 1 where id = $1 and finished = 0")
(build->id)
@ -312,22 +314,44 @@ void State::processQueueChange(Connection & conn)
currentIds[row["id"].as<BuildID>()] = row["globalPriority"].as<BuildID>();
}
auto builds_(builds.lock());
{
auto builds_(builds.lock());
for (auto i = builds_->begin(); i != builds_->end(); ) {
auto b = currentIds.find(i->first);
if (b == currentIds.end()) {
printMsg(lvlInfo, format("discarding cancelled build %1%") % i->first);
i = builds_->erase(i);
// FIXME: ideally we would interrupt active build steps here.
continue;
for (auto i = builds_->begin(); i != builds_->end(); ) {
auto b = currentIds.find(i->first);
if (b == currentIds.end()) {
printMsg(lvlInfo, format("discarding cancelled build %1%") % i->first);
i = builds_->erase(i);
// FIXME: ideally we would interrupt active build steps here.
continue;
}
if (i->second->globalPriority < b->second) {
printMsg(lvlInfo, format("priority of build %1% increased") % i->first);
i->second->globalPriority = b->second;
i->second->propagatePriorities();
}
++i;
}
if (i->second->globalPriority < b->second) {
printMsg(lvlInfo, format("priority of build %1% increased") % i->first);
i->second->globalPriority = b->second;
i->second->propagatePriorities();
}
{
auto activeSteps(activeSteps_.lock());
for (auto & activeStep : *activeSteps) {
auto threadId = activeStep->threadId; // FIXME: use Sync or atomic?
if (threadId == 0) continue;
std::set<Build::ptr> dependents;
std::set<Step::ptr> steps;
getDependents(activeStep->step, dependents, steps);
if (!dependents.empty()) continue;
printInfo("cancelling thread for build step %s", activeStep->step->drvPath);
int err = pthread_cancel(threadId);
if (err)
printError("error cancelling thread for build step %s: %s",
activeStep->step->drvPath, strerror(err));
}
++i;
}
}

View file

@ -28,6 +28,7 @@ typedef enum {
bsFailed = 1,
bsDepFailed = 2, // builds only
bsAborted = 3,
bsCancelled = 4,
bsFailedWithOutput = 6, // builds only
bsTimedOut = 7,
bsCachedFailure = 8, // steps only
@ -296,7 +297,6 @@ private:
counter nrBuildsDone{0};
counter nrStepsStarted{0};
counter nrStepsDone{0};
counter nrActiveSteps{0};
counter nrStepsBuilding{0};
counter nrStepsCopyingTo{0};
counter nrStepsCopyingFrom{0};
@ -370,10 +370,14 @@ private:
State & state;
Step::ptr step;
Machine::ptr machine;
pthread_t threadId = 0;
bool cancelled = false;
MachineReservation(State & state, Step::ptr step, Machine::ptr machine);
~MachineReservation();
};
nix::Sync<std::set<std::shared_ptr<MachineReservation>>> activeSteps_;
std::atomic<time_t> lastDispatcherCheck{0};
std::shared_ptr<nix::Store> localStore;
@ -413,9 +417,9 @@ private:
/* Thread to reload /etc/nix/machines periodically. */
void monitorMachinesFile();
unsigned int allocBuildStep(pqxx::work & txn, Build::ptr build);
unsigned int allocBuildStep(pqxx::work & txn, BuildID buildId);
unsigned int createBuildStep(pqxx::work & txn, time_t startTime, Build::ptr build, Step::ptr step,
unsigned int createBuildStep(pqxx::work & txn, time_t startTime, BuildID buildId, Step::ptr step,
const std::string & machine, BuildStatus status, const std::string & errorMsg = "",
BuildID propagatedFrom = 0);

View file

@ -23,7 +23,7 @@ FOR step IN steps; IF step.busy; busy = 1; END; END;
</thead>
<tbody>
[% FOREACH step IN steps %]
[% IF ( type == "All" ) || ( type == "Failed" && step.status != 0 ) || ( type == "Running" && step.busy == 1 ) %]
[% IF ( type == "All" ) || ( type == "Failed" && step.busy == 0 && step.status != 0 ) || ( type == "Running" && step.busy == 1 ) %]
[% has_log = seen.${step.drvpath} ? 0 : buildStepLogExists(step);
seen.${step.drvpath} = 1;
log = c.uri_for('/build' build.id 'nixlog' step.stepnr); %]
@ -49,7 +49,7 @@ FOR step IN steps; IF step.busy; busy = 1; END; END;
INCLUDE renderDuration duration = curTime - step.starttime;
END %]
</td>
<td>[% IF step.busy == 1 || ((step.machine || step.starttime) && (step.status == 0 || step.status == 1 || step.status == 3 || step.status == 7)); INCLUDE renderMachineName machine=step.machine; ELSE; "<em>n/a</em>"; END %]</td>
<td>[% IF step.busy == 1 || ((step.machine || step.starttime) && (step.status == 0 || step.status == 1 || step.status == 3 || step.status == 4 || step.status == 7)); INCLUDE renderMachineName machine=step.machine; ELSE; "<em>n/a</em>"; END %]</td>
<td>
[% IF step.busy == 1 %]
<strong>Building</strong>
@ -57,6 +57,8 @@ FOR step IN steps; IF step.busy; busy = 1; END; END;
Succeeded
[% ELSIF step.status == 3 %]
<span class="error"><strong>Aborted</strong>[% IF step.errormsg %]: [% HTML.escape(step.errormsg); END %]</span>
[% ELSIF step.status == 4 %]
<span class="error">Cancelled</span>
[% ELSIF step.status == 7 %]
<span class="error">Timed out</span>
[% ELSIF step.status == 8 %]
@ -242,7 +244,7 @@ FOR step IN steps; IF step.busy; busy = 1; END; END;
[% IF build.finished %]
[% IF steps && build.buildstatus != 0 && build.buildstatus != 6 %]
[% IF steps && build.buildstatus != 0 && build.buildstatus != 4 && build.buildstatus != 6 %]
<h3>Failed build steps</h3>
[% INCLUDE renderBuildSteps type="Failed" %]
[% END %]

View file

@ -203,7 +203,7 @@ create table Builds (
-- 1 = regular Nix failure (derivation returned non-zero exit code)
-- 2 = build of a dependency failed [builds only]
-- 3 = build or step aborted due to misc failure
-- 4 = build cancelled (removed from queue; never built) [builds only]
-- 4 = build or step cancelled
-- 5 = [obsolete]
-- 6 = failure with output (i.e. $out/nix-support/failed exists) [builds only]
-- 7 = build timed out