From b3169ce4389701fc77ee3e142ebb0cdd7a9fea18 Mon Sep 17 00:00:00 2001 From: Eelco Dolstra Date: Mon, 31 Oct 2016 14:58:29 +0100 Subject: [PATCH] Kill active build steps when builds are cancelled We now kill active build steps when there are no more referring builds. This is useful e.g. for preventing cancelled multi-hour TPC-H benchmark runs from hogging build machines. --- src/hydra-queue-runner/builder.cc | 70 +++++++++++++++----- src/hydra-queue-runner/hydra-queue-runner.cc | 16 ++--- src/hydra-queue-runner/queue-monitor.cc | 52 +++++++++++---- src/hydra-queue-runner/state.hh | 10 ++- src/root/build.tt | 8 ++- src/sql/hydra.sql | 2 +- 6 files changed, 111 insertions(+), 47 deletions(-) diff --git a/src/hydra-queue-runner/builder.cc b/src/hydra-queue-runner/builder.cc index 99e5e6fd..9f01425d 100644 --- a/src/hydra-queue-runner/builder.cc +++ b/src/hydra-queue-runner/builder.cc @@ -13,7 +13,14 @@ void State::builder(MachineReservation::ptr reservation) nrStepsStarted++; - MaintainCount mc(nrActiveSteps); + reservation->threadId = pthread_self(); + + activeSteps_.lock()->insert(reservation); + + Finally removeActiveStep([&]() { + reservation->threadId = -1; + activeSteps_.lock()->erase(reservation); + }); auto step = reservation->step; @@ -63,8 +70,15 @@ State::StepResult State::doBuildStep(nix::ref destStore, Step::ptr step, purpose of creating build steps. We could create a build step record for every build, but that could be very expensive (e.g. a stdenv derivation can be a dependency of tens of - thousands of builds), so we don't. */ - Build::ptr build; + thousands of builds), so we don't. + + We don't keep a Build::ptr here to allow + State::processQueueChange() to detect whether a step can be + cancelled (namely if there are no more Builds referring to + it). */ + BuildID buildId; + Path buildDrvPath; + unsigned int maxSilentTime, buildTimeout; { std::set dependents; @@ -85,6 +99,8 @@ State::StepResult State::doBuildStep(nix::ref destStore, Step::ptr step, return sMaybeCancelled; } + Build::ptr build; + for (auto build2 : dependents) { if (build2->drvPath == step->drvPath) { build = build2; @@ -93,11 +109,16 @@ State::StepResult State::doBuildStep(nix::ref destStore, Step::ptr step, } if (!build) build = *dependents.begin(); + buildId = build->id; + buildDrvPath = build->drvPath; + maxSilentTime = build->maxSilentTime; + buildTimeout = build->buildTimeout; + printMsg(lvlInfo, format("performing step ‘%1%’ on ‘%2%’ (needed by build %3% and %4% others)") - % step->drvPath % machine->sshName % build->id % (dependents.size() - 1)); + % step->drvPath % machine->sshName % buildId % (dependents.size() - 1)); } - bool quit = build->id == buildOne && step->drvPath == build->drvPath; + bool quit = buildId == buildOne && step->drvPath == buildDrvPath; auto conn(dbPool.get()); @@ -108,9 +129,9 @@ State::StepResult State::doBuildStep(nix::ref destStore, Step::ptr step, Finally clearStep([&]() { if (stepNr && !stepFinished) { - printError("marking step %d of build %d as orphaned", stepNr, build->id); + printError("marking step %d of build %d as orphaned", stepNr, buildId); auto orphanedSteps_(orphanedSteps.lock()); - orphanedSteps_->emplace(build->id, stepNr); + orphanedSteps_->emplace(buildId, stepNr); } }); @@ -127,20 +148,33 @@ State::StepResult State::doBuildStep(nix::ref destStore, Step::ptr step, { auto mc = startDbUpdate(); pqxx::work txn(*conn); - stepNr = createBuildStep(txn, result.startTime, build, step, machine->sshName, bsBusy); + stepNr = createBuildStep(txn, result.startTime, buildId, step, machine->sshName, bsBusy); txn.commit(); } /* Do the build. */ try { /* FIXME: referring builds may have conflicting timeouts. */ - buildRemote(destStore, machine, step, build->maxSilentTime, build->buildTimeout, result); + buildRemote(destStore, machine, step, maxSilentTime, buildTimeout, result); } catch (NoTokens & e) { result.stepStatus = bsNarSizeLimitExceeded; } catch (Error & e) { result.stepStatus = bsAborted; result.errorMsg = e.msg(); result.canRetry = true; + } catch (__cxxabiv1::__forced_unwind & e) { + /* The queue monitor thread cancelled this step. */ + try { + printInfo("marking step %d of build %d as succeeded", stepNr, buildId); + pqxx::work txn(*conn); + finishBuildStep(txn, result.startTime, time(0), result.overhead, buildId, + stepNr, machine->sshName, bsCancelled, ""); + txn.commit(); + stepFinished = true; + } catch (...) { + ignoreException(); + } + throw; } if (result.stepStatus == bsSuccess) @@ -167,7 +201,7 @@ State::StepResult State::doBuildStep(nix::ref destStore, Step::ptr step, { auto logCompressorQueue_(logCompressorQueue.lock()); assert(stepNr); - logCompressorQueue_->push({build->id, stepNr, result.logFile}); + logCompressorQueue_->push({buildId, stepNr, result.logFile}); } logCompressorWakeup.notify_one(); } @@ -187,7 +221,7 @@ State::StepResult State::doBuildStep(nix::ref destStore, Step::ptr step, auto mc = startDbUpdate(); { pqxx::work txn(*conn); - finishBuildStep(txn, result.startTime, result.stopTime, result.overhead, build->id, + finishBuildStep(txn, result.startTime, result.stopTime, result.overhead, buildId, stepNr, machine->sshName, result.stepStatus, result.errorMsg); txn.commit(); } @@ -243,11 +277,11 @@ State::StepResult State::doBuildStep(nix::ref destStore, Step::ptr step, pqxx::work txn(*conn); finishBuildStep(txn, result.startTime, result.stopTime, result.overhead, - build->id, stepNr, machine->sshName, bsSuccess); + buildId, stepNr, machine->sshName, bsSuccess); for (auto & b : direct) { printMsg(lvlInfo, format("marking build %1% as succeeded") % b->id); - markSucceededBuild(txn, b, res, build != b || result.isCached, + markSucceededBuild(txn, b, res, buildId != b->id || result.isCached, result.startTime, result.stopTime); } @@ -340,17 +374,17 @@ State::StepResult State::doBuildStep(nix::ref destStore, Step::ptr step, redundant with the build's isCachedBuild field). */ for (auto & build2 : indirect) { if ((result.stepStatus == bsCachedFailure && build2->drvPath == step->drvPath) || - (result.stepStatus != bsCachedFailure && build == build2) || + (result.stepStatus != bsCachedFailure && buildId == build2->id) || build2->finishedInDB) continue; - createBuildStep(txn, 0, build2, step, machine->sshName, - result.stepStatus, result.errorMsg, build == build2 ? 0 : build->id); + createBuildStep(txn, 0, build2->id, step, machine->sshName, + result.stepStatus, result.errorMsg, buildId == build2->id ? 0 : buildId); } if (result.stepStatus != bsCachedFailure && !stepFinished) { assert(stepNr); finishBuildStep(txn, result.startTime, result.stopTime, result.overhead, - build->id, stepNr, machine->sshName, result.stepStatus, result.errorMsg); + buildId, stepNr, machine->sshName, result.stepStatus, result.errorMsg); } /* Mark all builds that depend on this derivation as failed. */ @@ -392,7 +426,7 @@ State::StepResult State::doBuildStep(nix::ref destStore, Step::ptr step, /* Send notification about this build and its dependents. */ { auto notificationSenderQueue_(notificationSenderQueue.lock()); - notificationSenderQueue_->push(NotificationItem{NotificationItem::Type::BuildFinished, build->id, dependentIDs}); + notificationSenderQueue_->push(NotificationItem{NotificationItem::Type::BuildFinished, buildId, dependentIDs}); } notificationSenderWakeup.notify_one(); diff --git a/src/hydra-queue-runner/hydra-queue-runner.cc b/src/hydra-queue-runner/hydra-queue-runner.cc index c341d7e5..7e602a54 100644 --- a/src/hydra-queue-runner/hydra-queue-runner.cc +++ b/src/hydra-queue-runner/hydra-queue-runner.cc @@ -224,25 +224,25 @@ void State::clearBusy(Connection & conn, time_t stopTime) } -unsigned int State::allocBuildStep(pqxx::work & txn, Build::ptr build) +unsigned int State::allocBuildStep(pqxx::work & txn, BuildID buildId) { /* Acquire an exclusive lock on BuildSteps to ensure that we don't race with other threads creating a step of the same build. */ txn.exec("lock table BuildSteps in exclusive mode"); - auto res = txn.parameterized("select max(stepnr) from BuildSteps where build = $1")(build->id).exec(); + auto res = txn.parameterized("select max(stepnr) from BuildSteps where build = $1")(buildId).exec(); return res[0][0].is_null() ? 1 : res[0][0].as() + 1; } -unsigned int State::createBuildStep(pqxx::work & txn, time_t startTime, Build::ptr build, Step::ptr step, +unsigned int State::createBuildStep(pqxx::work & txn, time_t startTime, BuildID buildId, Step::ptr step, const std::string & machine, BuildStatus status, const std::string & errorMsg, BuildID propagatedFrom) { - unsigned int stepNr = allocBuildStep(txn, build); + auto stepNr = allocBuildStep(txn, buildId); txn.parameterized ("insert into BuildSteps (build, stepnr, type, drvPath, busy, startTime, system, status, propagatedFrom, errorMsg, stopTime, machine) values ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12)") - (build->id) + (buildId) (stepNr) (0) // == build (step->drvPath) @@ -258,7 +258,7 @@ unsigned int State::createBuildStep(pqxx::work & txn, time_t startTime, Build::p for (auto & output : step->drv.outputs) txn.parameterized ("insert into BuildStepOutputs (build, stepnr, name, path) values ($1, $2, $3, $4)") - (build->id)(stepNr)(output.first)(output.second.path).exec(); + (buildId)(stepNr)(output.first)(output.second.path).exec(); return stepNr; } @@ -284,7 +284,7 @@ void State::finishBuildStep(pqxx::work & txn, time_t startTime, time_t stopTime, int State::createSubstitutionStep(pqxx::work & txn, time_t startTime, time_t stopTime, Build::ptr build, const Path & drvPath, const string & outputName, const Path & storePath) { - int stepNr = allocBuildStep(txn, build); + auto stepNr = allocBuildStep(txn, build->id); txn.parameterized ("insert into BuildSteps (build, stepnr, type, drvPath, busy, status, startTime, stopTime) values ($1, $2, $3, $4, $5, $6, $7, $8)") @@ -574,7 +574,7 @@ void State::dumpStatus(Connection & conn, bool log) if (i->lock()) ++i; else i = runnable_->erase(i); root.attr("nrRunnableSteps", runnable_->size()); } - root.attr("nrActiveSteps", nrActiveSteps); + root.attr("nrActiveSteps", activeSteps_.lock()->size()); root.attr("nrStepsBuilding", nrStepsBuilding); root.attr("nrStepsCopyingTo", nrStepsCopyingTo); root.attr("nrStepsCopyingFrom", nrStepsCopyingFrom); diff --git a/src/hydra-queue-runner/queue-monitor.cc b/src/hydra-queue-runner/queue-monitor.cc index 86c6bf2e..d72d1989 100644 --- a/src/hydra-queue-runner/queue-monitor.cc +++ b/src/hydra-queue-runner/queue-monitor.cc @@ -2,6 +2,8 @@ #include "build-result.hh" #include "globals.hh" +#include + using namespace nix; @@ -180,7 +182,7 @@ bool State::getQueuedBuilds(Connection & conn, } } - createBuildStep(txn, 0, build, ex.step, "", bsCachedFailure, "", propagatedFrom); + createBuildStep(txn, 0, build->id, ex.step, "", bsCachedFailure, "", propagatedFrom); txn.parameterized ("update Builds set finished = 1, buildStatus = $2, startTime = $3, stopTime = $3, isCachedBuild = 1 where id = $1 and finished = 0") (build->id) @@ -312,22 +314,44 @@ void State::processQueueChange(Connection & conn) currentIds[row["id"].as()] = row["globalPriority"].as(); } - auto builds_(builds.lock()); + { + auto builds_(builds.lock()); - for (auto i = builds_->begin(); i != builds_->end(); ) { - auto b = currentIds.find(i->first); - if (b == currentIds.end()) { - printMsg(lvlInfo, format("discarding cancelled build %1%") % i->first); - i = builds_->erase(i); - // FIXME: ideally we would interrupt active build steps here. - continue; + for (auto i = builds_->begin(); i != builds_->end(); ) { + auto b = currentIds.find(i->first); + if (b == currentIds.end()) { + printMsg(lvlInfo, format("discarding cancelled build %1%") % i->first); + i = builds_->erase(i); + // FIXME: ideally we would interrupt active build steps here. + continue; + } + if (i->second->globalPriority < b->second) { + printMsg(lvlInfo, format("priority of build %1% increased") % i->first); + i->second->globalPriority = b->second; + i->second->propagatePriorities(); + } + ++i; } - if (i->second->globalPriority < b->second) { - printMsg(lvlInfo, format("priority of build %1% increased") % i->first); - i->second->globalPriority = b->second; - i->second->propagatePriorities(); + } + + { + auto activeSteps(activeSteps_.lock()); + for (auto & activeStep : *activeSteps) { + auto threadId = activeStep->threadId; // FIXME: use Sync or atomic? + if (threadId == 0) continue; + + std::set dependents; + std::set steps; + getDependents(activeStep->step, dependents, steps); + if (!dependents.empty()) continue; + + printInfo("cancelling thread for build step ‘%s’", activeStep->step->drvPath); + + int err = pthread_cancel(threadId); + if (err) + printError("error cancelling thread for build step ‘%s’: %s", + activeStep->step->drvPath, strerror(err)); } - ++i; } } diff --git a/src/hydra-queue-runner/state.hh b/src/hydra-queue-runner/state.hh index 92bba1ae..31898d01 100644 --- a/src/hydra-queue-runner/state.hh +++ b/src/hydra-queue-runner/state.hh @@ -28,6 +28,7 @@ typedef enum { bsFailed = 1, bsDepFailed = 2, // builds only bsAborted = 3, + bsCancelled = 4, bsFailedWithOutput = 6, // builds only bsTimedOut = 7, bsCachedFailure = 8, // steps only @@ -296,7 +297,6 @@ private: counter nrBuildsDone{0}; counter nrStepsStarted{0}; counter nrStepsDone{0}; - counter nrActiveSteps{0}; counter nrStepsBuilding{0}; counter nrStepsCopyingTo{0}; counter nrStepsCopyingFrom{0}; @@ -370,10 +370,14 @@ private: State & state; Step::ptr step; Machine::ptr machine; + pthread_t threadId = 0; + bool cancelled = false; MachineReservation(State & state, Step::ptr step, Machine::ptr machine); ~MachineReservation(); }; + nix::Sync>> activeSteps_; + std::atomic lastDispatcherCheck{0}; std::shared_ptr localStore; @@ -413,9 +417,9 @@ private: /* Thread to reload /etc/nix/machines periodically. */ void monitorMachinesFile(); - unsigned int allocBuildStep(pqxx::work & txn, Build::ptr build); + unsigned int allocBuildStep(pqxx::work & txn, BuildID buildId); - unsigned int createBuildStep(pqxx::work & txn, time_t startTime, Build::ptr build, Step::ptr step, + unsigned int createBuildStep(pqxx::work & txn, time_t startTime, BuildID buildId, Step::ptr step, const std::string & machine, BuildStatus status, const std::string & errorMsg = "", BuildID propagatedFrom = 0); diff --git a/src/root/build.tt b/src/root/build.tt index f6a5313d..68548b98 100644 --- a/src/root/build.tt +++ b/src/root/build.tt @@ -23,7 +23,7 @@ FOR step IN steps; IF step.busy; busy = 1; END; END; [% FOREACH step IN steps %] - [% IF ( type == "All" ) || ( type == "Failed" && step.status != 0 ) || ( type == "Running" && step.busy == 1 ) %] + [% IF ( type == "All" ) || ( type == "Failed" && step.busy == 0 && step.status != 0 ) || ( type == "Running" && step.busy == 1 ) %] [% has_log = seen.${step.drvpath} ? 0 : buildStepLogExists(step); seen.${step.drvpath} = 1; log = c.uri_for('/build' build.id 'nixlog' step.stepnr); %] @@ -49,7 +49,7 @@ FOR step IN steps; IF step.busy; busy = 1; END; END; INCLUDE renderDuration duration = curTime - step.starttime; END %] - [% IF step.busy == 1 || ((step.machine || step.starttime) && (step.status == 0 || step.status == 1 || step.status == 3 || step.status == 7)); INCLUDE renderMachineName machine=step.machine; ELSE; "n/a"; END %] + [% IF step.busy == 1 || ((step.machine || step.starttime) && (step.status == 0 || step.status == 1 || step.status == 3 || step.status == 4 || step.status == 7)); INCLUDE renderMachineName machine=step.machine; ELSE; "n/a"; END %] [% IF step.busy == 1 %] Building @@ -57,6 +57,8 @@ FOR step IN steps; IF step.busy; busy = 1; END; END; Succeeded [% ELSIF step.status == 3 %] Aborted[% IF step.errormsg %]: [% HTML.escape(step.errormsg); END %] + [% ELSIF step.status == 4 %] + Cancelled [% ELSIF step.status == 7 %] Timed out [% ELSIF step.status == 8 %] @@ -242,7 +244,7 @@ FOR step IN steps; IF step.busy; busy = 1; END; END; [% IF build.finished %] - [% IF steps && build.buildstatus != 0 && build.buildstatus != 6 %] + [% IF steps && build.buildstatus != 0 && build.buildstatus != 4 && build.buildstatus != 6 %]

Failed build steps

[% INCLUDE renderBuildSteps type="Failed" %] [% END %] diff --git a/src/sql/hydra.sql b/src/sql/hydra.sql index f9f6fc73..44a2eb14 100644 --- a/src/sql/hydra.sql +++ b/src/sql/hydra.sql @@ -203,7 +203,7 @@ create table Builds ( -- 1 = regular Nix failure (derivation returned non-zero exit code) -- 2 = build of a dependency failed [builds only] -- 3 = build or step aborted due to misc failure - -- 4 = build cancelled (removed from queue; never built) [builds only] + -- 4 = build or step cancelled -- 5 = [obsolete] -- 6 = failure with output (i.e. $out/nix-support/failed exists) [builds only] -- 7 = build timed out