Automatically retry aborted builds

Aborted builds are now put back on the runnable queue and retried
after a certain time interval (currently 60 seconds for the first
retry, then tripled on each subsequent retry).
This commit is contained in:
Eelco Dolstra 2015-06-17 11:45:20 +02:00
parent e02654b3a0
commit b91a616520
4 changed files with 145 additions and 61 deletions

View file

@ -84,7 +84,7 @@ static void copyClosureTo(std::shared_ptr<StoreAPI> store,
for (auto i = sorted.rbegin(); i != sorted.rend(); ++i) for (auto i = sorted.rbegin(); i != sorted.rend(); ++i)
if (present.find(*i) == present.end()) missing.push_back(*i); if (present.find(*i) == present.end()) missing.push_back(*i);
printMsg(lvlError, format("sending %1% missing paths") % missing.size()); printMsg(lvlDebug, format("sending %1% missing paths") % missing.size());
writeInt(cmdImportPaths, to); writeInt(cmdImportPaths, to);
exportPaths(*store, missing, false, to); exportPaths(*store, missing, false, to);
@ -128,6 +128,7 @@ void buildRemote(std::shared_ptr<StoreAPI> store,
FdSink to(child.to); FdSink to(child.to);
/* Handshake. */ /* Handshake. */
try {
writeInt(SERVE_MAGIC_1, to); writeInt(SERVE_MAGIC_1, to);
writeInt(SERVE_PROTOCOL_VERSION, to); writeInt(SERVE_PROTOCOL_VERSION, to);
to.flush(); to.flush();
@ -138,13 +139,17 @@ void buildRemote(std::shared_ptr<StoreAPI> store,
unsigned int version = readInt(from); unsigned int version = readInt(from);
if (GET_PROTOCOL_MAJOR(version) != 0x200) if (GET_PROTOCOL_MAJOR(version) != 0x200)
throw Error(format("unsupported nix-store --serve protocol version on %1%") % sshName); throw Error(format("unsupported nix-store --serve protocol version on %1%") % sshName);
} catch (EndOfFile & e) {
child.pid.wait(true);
throw Error(format("cannot connect to %1%: %2%") % sshName % chomp(readFile(logFile)));
}
/* Copy the input closure. */ /* Copy the input closure. */
printMsg(lvlError, format("sending closure of %1% to %2%") % drvPath % sshName); printMsg(lvlDebug, format("sending closure of %1% to %2%") % drvPath % sshName);
copyClosureTo(store, from, to, PathSet({drvPath})); copyClosureTo(store, from, to, PathSet({drvPath}));
/* Do the build. */ /* Do the build. */
printMsg(lvlError, format("building %1% on %2%") % drvPath % sshName); printMsg(lvlDebug, format("building %1% on %2%") % drvPath % sshName);
writeInt(cmdBuildPaths, to); writeInt(cmdBuildPaths, to);
writeStrings(PathSet({drvPath}), to); writeStrings(PathSet({drvPath}), to);
writeInt(3600, to); // == maxSilentTime, FIXME writeInt(3600, to); // == maxSilentTime, FIXME
@ -162,7 +167,7 @@ void buildRemote(std::shared_ptr<StoreAPI> store,
} }
/* Copy the output paths. */ /* Copy the output paths. */
printMsg(lvlError, format("copying outputs of %1% from %2%") % drvPath % sshName); printMsg(lvlDebug, format("copying outputs of %1% from %2%") % drvPath % sshName);
PathSet outputs; PathSet outputs;
for (auto & output : drv.outputs) for (auto & output : drv.outputs)
outputs.insert(output.second.path); outputs.insert(output.second.path);

View file

@ -4,6 +4,8 @@
#include <map> #include <map>
#include <memory> #include <memory>
#include <thread> #include <thread>
#include <cmath>
#include <chrono>
#include <pqxx/pqxx> #include <pqxx/pqxx>
@ -20,6 +22,14 @@
using namespace nix; using namespace nix;
const int maxTries = 5;
const int retryInterval = 60; // seconds
const float retryBackoff = 3.0;
typedef std::chrono::time_point<std::chrono::system_clock> system_time;
template <class C, class V> template <class C, class V>
bool has(const C & c, const V & v) bool has(const C & c, const V & v)
{ {
@ -100,6 +110,12 @@ struct Step
/* Builds that have this step as the top-level derivation. */ /* Builds that have this step as the top-level derivation. */
std::vector<Build::wptr> builds; std::vector<Build::wptr> builds;
/* Number of times we've tried this step. */
unsigned int tries = 0;
/* Point in time after which the step can be retried. */
system_time after;
}; };
Sync<State> state; Sync<State> state;
@ -108,10 +124,7 @@ struct Step
Step() : destroyed(false) { } Step() : destroyed(false) { }
~Step() ~Step() { }
{
printMsg(lvlDebug, format("destroying step %1%") % drvPath);
}
}; };
@ -198,7 +211,10 @@ private:
Sync<Machines> machines; Sync<Machines> machines;
/* Various stats. */ /* Various stats. */
std::atomic<int> nrQueueWakeups; std::atomic<unsigned int> nrRetries;
std::atomic<unsigned int> maxNrRetries;
std::atomic<unsigned int> nrQueueWakeups;
std::atomic<unsigned int> nrDispatcherWakeups;
public: public:
State(); State();
@ -246,7 +262,9 @@ public:
void builder(Step::ptr step, MachineReservation::ptr reservation); void builder(Step::ptr step, MachineReservation::ptr reservation);
void doBuildStep(std::shared_ptr<StoreAPI> store, Step::ptr step, /* Perform the given build step. Return true if the step is to be
retried. */
bool doBuildStep(std::shared_ptr<StoreAPI> store, Step::ptr step,
Machine::ptr machine); Machine::ptr machine);
void markSucceededBuild(pqxx::work & txn, Build::ptr build, void markSucceededBuild(pqxx::work & txn, Build::ptr build,
@ -262,7 +280,7 @@ public:
State::State() State::State()
{ {
nrQueueWakeups = 0; nrRetries = maxNrRetries = nrQueueWakeups = nrDispatcherWakeups = 0;
hydraData = getEnv("HYDRA_DATA"); hydraData = getEnv("HYDRA_DATA");
if (hydraData == "") throw Error("$HYDRA_DATA must be set"); if (hydraData == "") throw Error("$HYDRA_DATA must be set");
@ -799,6 +817,8 @@ void State::dispatcher()
while (true) { while (true) {
printMsg(lvlDebug, "dispatcher woken up"); printMsg(lvlDebug, "dispatcher woken up");
auto sleepUntil = system_time::max();
{ {
auto runnable_(runnable.lock()); auto runnable_(runnable.lock());
printMsg(lvlDebug, format("%1% runnable builds") % runnable_->size()); printMsg(lvlDebug, format("%1% runnable builds") % runnable_->size());
@ -806,6 +826,8 @@ void State::dispatcher()
/* FIXME: we're holding the runnable lock too long /* FIXME: we're holding the runnable lock too long
here. This could be more efficient. */ here. This could be more efficient. */
system_time now = std::chrono::system_clock::now();
for (auto i = runnable_->begin(); i != runnable_->end(); ) { for (auto i = runnable_->begin(); i != runnable_->end(); ) {
auto step = i->lock(); auto step = i->lock();
@ -815,6 +837,18 @@ void State::dispatcher()
continue; continue;
} }
/* Skip previously failed steps that aren't ready to
be retried. */
{
auto step_(step->state.lock());
if (step_->tries > 0 && step_->after > now) {
if (step_->after < sleepUntil)
sleepUntil = step_->after;
++i;
continue;
}
}
auto reservation = findMachine(step); auto reservation = findMachine(step);
if (!reservation) { if (!reservation) {
printMsg(lvlDebug, format("cannot execute step %1% right now") % step->drvPath); printMsg(lvlDebug, format("cannot execute step %1% right now") % step->drvPath);
@ -833,7 +867,10 @@ void State::dispatcher()
is added, or because a build finishes). */ is added, or because a build finishes). */
{ {
std::unique_lock<std::mutex> lock(dispatcherMutex); std::unique_lock<std::mutex> lock(dispatcherMutex);
dispatcherWakeup.wait(lock); printMsg(lvlDebug, format("dispatcher sleeping for %1%s") %
std::chrono::duration_cast<std::chrono::seconds>(sleepUntil - std::chrono::system_clock::now()).count());
dispatcherWakeup.wait_until(lock, sleepUntil);
nrDispatcherWakeups++;
} }
} }
@ -871,22 +908,40 @@ MachineReservation::ptr State::findMachine(Step::ptr step)
void State::builder(Step::ptr step, MachineReservation::ptr reservation) void State::builder(Step::ptr step, MachineReservation::ptr reservation)
{ {
bool retry = true;
try { try {
auto store = openStore(); // FIXME: pool auto store = openStore(); // FIXME: pool
doBuildStep(store, step, reservation->machine); retry = doBuildStep(store, step, reservation->machine);
} catch (std::exception & e) { } catch (std::exception & e) {
printMsg(lvlError, format("error building %1%: %2%") % step->drvPath % e.what()); printMsg(lvlError, format("uncaught exception building %1% on %2%: %3%")
// FIXME: put step back in runnable and retry % step->drvPath % reservation->machine->sshName % e.what());
} }
/* Release the machine and wake up the dispatcher. */ /* Release the machine and wake up the dispatcher. */
assert(reservation.unique()); assert(reservation.unique());
reservation = 0; reservation = 0;
wakeDispatcher(); wakeDispatcher();
/* If there was a temporary failure, retry the step after an
exponentially increasing interval. */
if (retry) {
{
auto step_(step->state.lock());
step_->tries++;
nrRetries++;
if (step_->tries > maxNrRetries) maxNrRetries = step_->tries; // yeah yeah, not atomic
int delta = retryInterval * powf(retryBackoff, step_->tries - 1);
printMsg(lvlInfo, format("will retry %1% after %2%s") % step->drvPath % delta);
step_->after = std::chrono::system_clock::now() + std::chrono::seconds(delta);
}
makeRunnable(step);
}
} }
void State::doBuildStep(std::shared_ptr<StoreAPI> store, Step::ptr step, bool State::doBuildStep(std::shared_ptr<StoreAPI> store, Step::ptr step,
Machine::ptr machine) Machine::ptr machine)
{ {
/* There can be any number of builds in the database that depend /* There can be any number of builds in the database that depend
@ -903,14 +958,16 @@ void State::doBuildStep(std::shared_ptr<StoreAPI> store, Step::ptr step,
if (dependents.empty()) { if (dependents.empty()) {
/* Apparently all builds that depend on this derivation /* Apparently all builds that depend on this derivation
are gone (e.g. cancelled). So don't bother. (This is are gone (e.g. cancelled). So don't bother. This is
very unlikely to happen, because normally Steps are very unlikely to happen, because normally Steps are
only kept alive by being reachable from a only kept alive by being reachable from a
Build). FIXME: what if a new Build gets a reference to Build. However, it's possible that a new Build just
this step? */ created a reference to this step. So to handle that
possibility, we retry this step (putting it back in
the runnable queue). If there are really no strong
pointers to the step, it will be deleted. */
printMsg(lvlInfo, format("cancelling build step %1%") % step->drvPath); printMsg(lvlInfo, format("cancelling build step %1%") % step->drvPath);
destroyStep(step, false); return true;
return;
} }
for (auto build2 : dependents) for (auto build2 : dependents)
@ -930,8 +987,8 @@ void State::doBuildStep(std::shared_ptr<StoreAPI> store, Step::ptr step,
result.startTime = time(0); result.startTime = time(0);
/* If any of the outputs have previously failed, then don't /* If any of the outputs have previously failed, then don't bother
retry. */ building again. */
bool cachedFailure = checkCachedFailure(step, *conn); bool cachedFailure = checkCachedFailure(step, *conn);
if (cachedFailure) if (cachedFailure)
@ -952,8 +1009,8 @@ void State::doBuildStep(std::shared_ptr<StoreAPI> store, Step::ptr step,
} catch (Error & e) { } catch (Error & e) {
result.status = RemoteResult::rrMiscFailure; result.status = RemoteResult::rrMiscFailure;
result.errorMsg = e.msg(); result.errorMsg = e.msg();
printMsg(lvlError, format("ERROR: %1%") % e.msg()); printMsg(lvlError, format("irregular failure building %1% on %2%: %3%")
abort(); // FIXME % step->drvPath % machine->sshName % e.msg());
} }
if (result.status == RemoteResult::rrSuccess) res = getBuildResult(store, step->drv); if (result.status == RemoteResult::rrSuccess) res = getBuildResult(store, step->drv);
@ -963,13 +1020,19 @@ void State::doBuildStep(std::shared_ptr<StoreAPI> store, Step::ptr step,
if (!result.stopTime) result.stopTime = time(0); if (!result.stopTime) result.stopTime = time(0);
bool retry = false;
if (result.status == RemoteResult::rrMiscFailure) {
auto step_(step->state.lock());
retry = step_->tries + 1 < maxTries;
}
/* Remove this step. After this, incoming builds that depend on /* Remove this step. After this, incoming builds that depend on
drvPath will either see that the output paths exist, or will drvPath will either see that the output paths exist, or will
create a new build step for drvPath. The latter is fine - it create a new build step for drvPath. The latter is fine - it
won't conflict with this one, because we're removing it. In any won't conflict with this one, because we're removing it. In any
case, the set of dependent builds for step can't increase case, the set of dependent builds for step can't increase
anymore because step is no longer visible to createStep(). */ anymore because step is no longer visible to createStep(). */
{ if (!retry) {
auto steps_(steps.lock()); auto steps_(steps.lock());
steps_->erase(step->drvPath); steps_->erase(step->drvPath);
} }
@ -1002,29 +1065,39 @@ void State::doBuildStep(std::shared_ptr<StoreAPI> store, Step::ptr step,
} else { } else {
/* Failure case. */ /* Failure case. */
BuildStatus buildStatus =
result.status == RemoteResult::rrPermanentFailure ? bsFailed : bsAborted;
BuildStepStatus buildStepStatus =
result.status == RemoteResult::rrPermanentFailure ? bssFailed : bssAborted;
/* For regular failures, we don't care about the error /* For regular failures, we don't care about the error
message. */ message. */
if (result.status != RemoteResult::rrMiscFailure) result.errorMsg = ""; if (buildStatus != bsAborted) result.errorMsg = "";
if (!cachedFailure) { if (!cachedFailure && !retry) {
/* Create failed build steps for every build that depends /* Create failed build steps for every build that depends
on this. */ on this. */
for (auto build2 : dependents) { for (auto build2 : dependents) {
if (build == build2) continue; if (build == build2) continue;
createBuildStep(txn, 0, build2, step, machine->sshName, bssFailed, result.errorMsg, build->id); createBuildStep(txn, 0, build2, step, machine->sshName,
buildStepStatus, result.errorMsg, build->id);
} }
finishBuildStep(txn, result.startTime, result.stopTime, build->id, stepNr, machine->sshName, bssFailed, result.errorMsg);
} }
if (!cachedFailure)
finishBuildStep(txn, result.startTime, result.stopTime, build->id,
stepNr, machine->sshName, buildStepStatus, result.errorMsg);
/* Mark all builds that depend on this derivation as failed. */ /* Mark all builds that depend on this derivation as failed. */
if (!retry)
for (auto build2 : dependents) { for (auto build2 : dependents) {
printMsg(lvlError, format("marking build %1% as failed") % build2->id); printMsg(lvlError, format("marking build %1% as failed") % build2->id);
txn.parameterized txn.parameterized
("update Builds set finished = 1, busy = 0, buildStatus = $2, startTime = $3, stopTime = $4, isCachedBuild = $5 where id = $1") ("update Builds set finished = 1, busy = 0, buildStatus = $2, startTime = $3, stopTime = $4, isCachedBuild = $5 where id = $1")
(build2->id) (build2->id)
((int) (build2->drvPath == step->drvPath ? bsFailed : bsDepFailed)) ((int) (build2->drvPath != step->drvPath && buildStatus == bsFailed ? bsDepFailed : buildStatus))
(result.startTime) (result.startTime)
(result.stopTime) (result.stopTime)
(cachedFailure ? 1 : 0).exec(); (cachedFailure ? 1 : 0).exec();
@ -1045,6 +1118,7 @@ void State::doBuildStep(std::shared_ptr<StoreAPI> store, Step::ptr step,
is the top-level derivation. In case of failure, destroy all is the top-level derivation. In case of failure, destroy all
dependent Build objects. Any Steps not referenced by other dependent Build objects. Any Steps not referenced by other
Builds will be destroyed as well. */ Builds will be destroyed as well. */
if (!retry)
for (auto build2 : dependents) for (auto build2 : dependents)
if (build2->toplevel == step || result.status != RemoteResult::rrSuccess) { if (build2->toplevel == step || result.status != RemoteResult::rrSuccess) {
auto builds_(builds.lock()); auto builds_(builds.lock());
@ -1054,7 +1128,10 @@ void State::doBuildStep(std::shared_ptr<StoreAPI> store, Step::ptr step,
/* Remove the step from the graph. In case of success, make /* Remove the step from the graph. In case of success, make
dependent build steps runnable if they have no other dependent build steps runnable if they have no other
dependencies. */ dependencies. */
if (!retry)
destroyStep(step, result.status == RemoteResult::rrSuccess); destroyStep(step, result.status == RemoteResult::rrSuccess);
return retry;
} }
@ -1122,7 +1199,11 @@ void State::dumpStatus()
if (i->lock()) ++i; else i = runnable_->erase(i); if (i->lock()) ++i; else i = runnable_->erase(i);
printMsg(lvlError, format("%1% runnable build steps") % runnable_->size()); printMsg(lvlError, format("%1% runnable build steps") % runnable_->size());
} }
printMsg(lvlError, format("%1% times woken up to check the queue") % nrQueueWakeups); printMsg(lvlError, format("%1% build step retries") % nrRetries);
printMsg(lvlError, format("%1% most retries for any build step") % maxNrRetries);
printMsg(lvlError, format("%1% queue wakeups") % nrQueueWakeups);
printMsg(lvlError, format("%1% dispatcher wakeups") % nrDispatcherWakeups);
printMsg(lvlError, format("%1% database connections") % dbPool.count());
{ {
auto machines_(machines.lock()); auto machines_(machines.lock());
for (auto & m : *machines_) { for (auto & m : *machines_) {
@ -1145,9 +1226,6 @@ void State::run()
std::thread(&State::dispatcher, this).detach(); std::thread(&State::dispatcher, this).detach();
queueMonitorThread.join(); queueMonitorThread.join();
//printMsg(lvlInfo, "exiting...");
//printMsg(lvlInfo, format("psql connections = %1%") % dbPool.count());
} }

View file

@ -51,7 +51,7 @@
[% ELSIF step.status == 0 %] [% ELSIF step.status == 0 %]
Succeeded Succeeded
[% ELSIF step.status == 4 %] [% ELSIF step.status == 4 %]
<span class="error">Aborted</span> <span class="error"><strong>Aborted</strong>[% IF step.errormsg %]: [% HTML.escape(step.errormsg); END %]</span>
[% ELSIF step.status == 7 %] [% ELSIF step.status == 7 %]
<span class="error">Timed out</span> <span class="error">Timed out</span>
[% ELSIF step.status == 8 %] [% ELSIF step.status == 8 %]

View file

@ -372,6 +372,7 @@ create table CachedCVSInputs (
); );
-- FIXME: remove
create table SystemTypes ( create table SystemTypes (
system text primary key not null, system text primary key not null,
maxConcurrent integer not null default 2 maxConcurrent integer not null default 2