hydra-queue-runner: Improve dispatcher

We now take the machine speed factor into account, just like
build-remote.pl.
This commit is contained in:
Eelco Dolstra 2015-06-18 01:52:20 +02:00
parent 3855131185
commit a40ca6b76e

View file

@ -6,6 +6,7 @@
#include <thread>
#include <cmath>
#include <chrono>
#include <algorithm>
#include <pqxx/pqxx>
@ -159,13 +160,7 @@ struct Machine
unsigned int maxJobs = 1;
float speedFactor = 1.0;
Sync<unsigned int> currentJobs;
Machine()
{
auto currentJobs_(currentJobs.lock());
*currentJobs_ = 0;
}
std::atomic<unsigned int> currentJobs{0};
bool supportsStep(Step::ptr step)
{
@ -187,13 +182,11 @@ struct MachineReservation
Machine::ptr machine;
MachineReservation(Machine::ptr machine) : machine(machine)
{
auto currentJobs_(machine->currentJobs.lock());
(*currentJobs_)++;
machine->currentJobs++;
}
~MachineReservation()
{
auto currentJobs_(machine->currentJobs.lock());
if (*currentJobs_ > 0) (*currentJobs_)--;
machine->currentJobs--;
}
};
@ -284,8 +277,6 @@ public:
void wakeDispatcher();
MachineReservation::ptr findMachine(Step::ptr step);
void builder(Step::ptr step, MachineReservation::ptr reservation);
/* Perform the given build step. Return true if the step is to be
@ -878,49 +869,98 @@ void State::dispatcher()
auto sleepUntil = system_time::max();
{
auto runnable_(runnable.lock());
printMsg(lvlDebug, format("%1% runnable builds") % runnable_->size());
bool keepGoing;
/* FIXME: we're holding the runnable lock too long
here. This could be more efficient. */
do {
/* Bail out when there are no slots left. */
std::vector<Machine::ptr> machinesSorted;
{
auto machines_(machines.lock());
machinesSorted.insert(machinesSorted.end(),
machines_->begin(), machines_->end());
}
/* Sort the machines by a combination of speed factor and
available slots. Prioritise the available machines as
follows:
- First by load divided by speed factor, rounded to the
nearest integer. This causes fast machines to be
preferred over slow machines with similar loads.
- Then by speed factor.
- Finally by load. */
sort(machinesSorted.begin(), machinesSorted.end(),
[](const Machine::ptr & a, const Machine::ptr & b) -> bool
{
float ta = roundf(a->currentJobs / a->speedFactor);
float tb = roundf(b->currentJobs / b->speedFactor);
return
ta != tb ? ta > tb :
a->speedFactor != b->speedFactor ? a->speedFactor > b->speedFactor :
a->maxJobs > b->maxJobs;
});
/* Find a machine with a free slot and find a step to run
on it. Once we find such a pair, we restart the outer
loop because the machine sorting will have changed. */
keepGoing = false;
system_time now = std::chrono::system_clock::now();
for (auto i = runnable_->begin(); i != runnable_->end(); ) {
auto step = i->lock();
for (auto & machine : machinesSorted) {
// FIXME: can we lose a wakeup if a builder exits concurrently?
if (machine->currentJobs >= machine->maxJobs) continue;
/* Delete dead steps. */
if (!step) {
i = runnable_->erase(i);
continue;
}
auto runnable_(runnable.lock());
printMsg(lvlDebug, format("%1% runnable builds") % runnable_->size());
/* Skip previously failed steps that aren't ready to
be retried. */
{
auto step_(step->state.lock());
if (step_->tries > 0 && step_->after > now) {
if (step_->after < sleepUntil)
sleepUntil = step_->after;
/* FIXME: we're holding the runnable lock too long
here. This could be more efficient. */
for (auto i = runnable_->begin(); i != runnable_->end(); ) {
auto step = i->lock();
/* Delete dead steps. */
if (!step) {
i = runnable_->erase(i);
continue;
}
/* Can this machine do this step? */
if (!machine->supportsStep(step)) {
++i;
continue;
}
/* Skip previously failed steps that aren't ready
to be retried. */
{
auto step_(step->state.lock());
if (step_->tries > 0 && step_->after > now) {
if (step_->after < sleepUntil)
sleepUntil = step_->after;
++i;
continue;
}
}
/* Make a slot reservation and start a thread to
do the build. */
auto reservation = std::make_shared<MachineReservation>(machine);
i = runnable_->erase(i);
auto builderThread = std::thread(&State::builder, this, step, reservation);
builderThread.detach(); // FIXME?
keepGoing = true;
break;
}
auto reservation = findMachine(step);
if (!reservation) {
printMsg(lvlDebug, format("cannot execute step %1% right now") % step->drvPath);
++i;
continue;
}
i = runnable_->erase(i);
auto builderThread = std::thread(&State::builder, this, step, reservation);
builderThread.detach(); // FIXME?
if (keepGoing) break;
}
}
} while (keepGoing);
/* Sleep until we're woken up (either because a runnable build
is added, or because a build finishes). */
@ -944,23 +984,6 @@ void State::wakeDispatcher()
}
MachineReservation::ptr State::findMachine(Step::ptr step)
{
auto machines_(machines.lock());
for (auto & machine : *machines_) {
if (!machine->supportsStep(step)) continue;
{
auto currentJobs_(machine->currentJobs.lock());
if (*currentJobs_ >= machine->maxJobs) continue;
}
return std::make_shared<MachineReservation>(machine);
}
return 0;
}
void State::builder(Step::ptr step, MachineReservation::ptr reservation)
{
bool retry = true;
@ -1274,9 +1297,8 @@ void State::dumpStatus()
{
auto machines_(machines.lock());
for (auto & m : *machines_) {
auto currentJobs_(m->currentJobs.lock());
printMsg(lvlError, format("machine %1%: %2%/%3% active")
% m->sshName % *currentJobs_ % m->maxJobs);
% m->sshName % m->currentJobs % m->maxJobs);
}
}
}