Temporarily disable machines after a connection failure

This commit is contained in:
Eelco Dolstra 2015-07-21 15:53:27 +02:00
parent 7e026d35f7
commit c18fb0ad74
4 changed files with 43 additions and 4 deletions

View file

@ -160,10 +160,33 @@ void State::buildRemote(std::shared_ptr<StoreAPI> store,
sendDerivation = false; sendDerivation = false;
} catch (EndOfFile & e) { } catch (EndOfFile & e) {
child.pid.wait(true); child.pid.wait(true);
{
/* Disable this machine until a certain period of time has
passed. This period increases on every consecutive
failure. However, don't count failures that occurred
soon after the last one (to take into account steps
started in parallel). */
auto info(machine->state->connectInfo.lock());
auto now = std::chrono::system_clock::now();
if (info->consecutiveFailures == 0 || info->lastFailure < now - std::chrono::seconds(30)) {
info->consecutiveFailures = std::min(info->consecutiveFailures + 1, (unsigned int) 4);
info->lastFailure = now;
int delta = retryInterval * powf(retryBackoff, info->consecutiveFailures - 1) + (rand() % 30);
printMsg(lvlInfo, format("will disable machine %1% for %2%s") % machine->sshName % delta);
info->disabledUntil = now + std::chrono::seconds(delta);
}
}
string s = chomp(readFile(result.logFile)); string s = chomp(readFile(result.logFile));
throw Error(format("cannot connect to %1%: %2%") % machine->sshName % s); throw Error(format("cannot connect to %1%: %2%") % machine->sshName % s);
} }
{
auto info(machine->state->connectInfo.lock());
info->consecutiveFailures = 0;
}
/* Gather the inputs. If the remote side is Nix <= 1.9, we have to /* Gather the inputs. If the remote side is Nix <= 1.9, we have to
copy the entire closure of drvPath, as well the required copy the entire closure of drvPath, as well the required
outputs of the input derivations. On Nix > 1.9, we only need to outputs of the input derivations. On Nix > 1.9, we only need to

View file

@ -33,7 +33,7 @@ void State::builder(Step::ptr step, Machine::ptr machine, std::shared_ptr<Mainta
step_->tries++; step_->tries++;
nrRetries++; nrRetries++;
if (step_->tries > maxNrRetries) maxNrRetries = step_->tries; // yeah yeah, not atomic if (step_->tries > maxNrRetries) maxNrRetries = step_->tries; // yeah yeah, not atomic
int delta = retryInterval * powf(retryBackoff, step_->tries - 1); int delta = retryInterval * powf(retryBackoff, step_->tries - 1) + (rand() % 10);
printMsg(lvlInfo, format("will retry %1% after %2%s") % step->drvPath % delta); printMsg(lvlInfo, format("will retry %1% after %2%s") % step->drvPath % delta);
step_->after = std::chrono::system_clock::now() + std::chrono::seconds(delta); step_->after = std::chrono::system_clock::now() + std::chrono::seconds(delta);
} }

View file

@ -36,9 +36,12 @@ void State::dispatcher()
bool keepGoing; bool keepGoing;
do { do {
system_time now = std::chrono::system_clock::now();
/* Copy the currentJobs field of each machine. This is /* Copy the currentJobs field of each machine. This is
necessary to ensure that the sort comparator below is necessary to ensure that the sort comparator below is
an ordering. std::sort() can segfault if it isn't. */ an ordering. std::sort() can segfault if it isn't. Also
filter out temporarily disabled machines. */
struct MachineInfo struct MachineInfo
{ {
Machine::ptr machine; Machine::ptr machine;
@ -47,8 +50,15 @@ void State::dispatcher()
std::vector<MachineInfo> machinesSorted; std::vector<MachineInfo> machinesSorted;
{ {
auto machines_(machines.lock()); auto machines_(machines.lock());
for (auto & m : *machines_) for (auto & m : *machines_) {
auto info(m.second->state->connectInfo.lock());
if (info->consecutiveFailures && info->disabledUntil > now) {
if (info->disabledUntil < sleepUntil)
sleepUntil = info->disabledUntil;
continue;
}
machinesSorted.push_back({m.second, m.second->state->currentJobs}); machinesSorted.push_back({m.second, m.second->state->currentJobs});
}
} }
/* Sort the machines by a combination of speed factor and /* Sort the machines by a combination of speed factor and
@ -77,7 +87,6 @@ void State::dispatcher()
on it. Once we find such a pair, we restart the outer on it. Once we find such a pair, we restart the outer
loop because the machine sorting will have changed. */ loop because the machine sorting will have changed. */
keepGoing = false; keepGoing = false;
system_time now = std::chrono::system_clock::now();
for (auto & mi : machinesSorted) { for (auto & mi : machinesSorted) {
// FIXME: can we lose a wakeup if a builder exits concurrently? // FIXME: can we lose a wakeup if a builder exits concurrently?

View file

@ -137,6 +137,13 @@ struct Machine
counter totalStepTime{0}; // total time for steps, including closure copying counter totalStepTime{0}; // total time for steps, including closure copying
counter totalStepBuildTime{0}; // total build time for steps counter totalStepBuildTime{0}; // total build time for steps
struct ConnectInfo
{
system_time lastFailure, disabledUntil;
unsigned int consecutiveFailures;
};
Sync<ConnectInfo> connectInfo;
/* Mutex to prevent multiple threads from sending data to the /* Mutex to prevent multiple threads from sending data to the
same machine (which would be inefficient). */ same machine (which would be inefficient). */
std::mutex sendLock; std::mutex sendLock;