hydra-queue-runner: Fix segfault sorting machines by load

While sorting machines by load, the load of a machine
(machine->currentJobs) can be changed by other threads. If that
happens, the comparator is no longer a proper ordering, in which case
std::sort() can segfault. So we now make a copy of currentJobs before
sorting.
This commit is contained in:
Eelco Dolstra 2015-06-21 16:21:42 +02:00
parent a0eff6fc15
commit d744362e4a

View file

@ -862,12 +862,19 @@ void State::dispatcher()
bool keepGoing; bool keepGoing;
do { do {
/* Bail out when there are no slots left. */ /* Copy the currentJobs field of each machine. This is
std::vector<Machine::ptr> machinesSorted; necessary to ensure that the sort comparator below is a
ordering. std::sort() can segfault if it isn't. */
struct MachineInfo
{
Machine::ptr machine;
unsigned int currentJobs;
};
std::vector<MachineInfo> machinesSorted;
{ {
auto machines_(machines.lock()); auto machines_(machines.lock());
machinesSorted.insert(machinesSorted.end(), for (auto & m : *machines_)
machines_->begin(), machines_->end()); machinesSorted.push_back({m, m->currentJobs});
} }
/* Sort the machines by a combination of speed factor and /* Sort the machines by a combination of speed factor and
@ -882,14 +889,14 @@ void State::dispatcher()
- Finally by load. */ - Finally by load. */
sort(machinesSorted.begin(), machinesSorted.end(), sort(machinesSorted.begin(), machinesSorted.end(),
[](const Machine::ptr & a, const Machine::ptr & b) -> bool [](const MachineInfo & a, const MachineInfo & b) -> bool
{ {
float ta = roundf(a->currentJobs / a->speedFactor); float ta = roundf(a.currentJobs / a.machine->speedFactor);
float tb = roundf(b->currentJobs / b->speedFactor); float tb = roundf(b.currentJobs / b.machine->speedFactor);
return return
ta != tb ? ta < tb : ta != tb ? ta < tb :
a->speedFactor != b->speedFactor ? a->speedFactor > b->speedFactor : a.machine->speedFactor != b.machine->speedFactor ? a.machine->speedFactor > b.machine->speedFactor :
a->currentJobs > b->currentJobs; a.currentJobs > b.currentJobs;
}); });
/* Find a machine with a free slot and find a step to run /* Find a machine with a free slot and find a step to run
@ -898,9 +905,9 @@ void State::dispatcher()
keepGoing = false; keepGoing = false;
system_time now = std::chrono::system_clock::now(); system_time now = std::chrono::system_clock::now();
for (auto & machine : machinesSorted) { for (auto & mi : machinesSorted) {
// FIXME: can we lose a wakeup if a builder exits concurrently? // FIXME: can we lose a wakeup if a builder exits concurrently?
if (machine->currentJobs >= machine->maxJobs) continue; if (mi.machine->currentJobs >= mi.machine->maxJobs) continue;
auto runnable_(runnable.lock()); auto runnable_(runnable.lock());
//printMsg(lvlDebug, format("%1% runnable builds") % runnable_->size()); //printMsg(lvlDebug, format("%1% runnable builds") % runnable_->size());
@ -918,7 +925,7 @@ void State::dispatcher()
} }
/* Can this machine do this step? */ /* Can this machine do this step? */
if (!machine->supportsStep(step)) { if (!mi.machine->supportsStep(step)) {
++i; ++i;
continue; continue;
} }
@ -937,7 +944,7 @@ void State::dispatcher()
/* Make a slot reservation and start a thread to /* Make a slot reservation and start a thread to
do the build. */ do the build. */
auto reservation = std::make_shared<MachineReservation>(machine); auto reservation = std::make_shared<MachineReservation>(mi.machine);
i = runnable_->erase(i); i = runnable_->erase(i);
auto builderThread = std::thread(&State::builder, this, step, reservation); auto builderThread = std::thread(&State::builder, this, step, reservation);