hydra/src/hydra-queue-runner/dispatcher.cc

205 lines
6.5 KiB
C++
Raw Normal View History

#include <iostream>
2015-07-21 13:14:17 +00:00
#include <algorithm>
#include <thread>
#include "state.hh"
using namespace nix;
void State::makeRunnable(Step::ptr step)
{
printMsg(lvlChatty, format("step %1% is now runnable") % step->drvPath);
{
auto step_(step->state.lock());
assert(step_->created);
assert(!step->finished);
assert(step_->deps.empty());
}
{
auto runnable_(runnable.lock());
runnable_->push_back(step);
}
wakeDispatcher();
}
void State::dispatcher()
{
while (true) {
printMsg(lvlDebug, "dispatcher woken up");
2015-08-10 09:26:30 +00:00
auto sleepUntil = doDispatch();
2015-07-21 13:14:17 +00:00
2015-08-10 09:26:30 +00:00
/* Sleep until we're woken up (either because a runnable build
is added, or because a build finishes). */
{
auto dispatcherWakeup_(dispatcherWakeup.lock());
if (!*dispatcherWakeup_) {
printMsg(lvlDebug, format("dispatcher sleeping for %1%s") %
std::chrono::duration_cast<std::chrono::seconds>(sleepUntil - std::chrono::system_clock::now()).count());
dispatcherWakeup_.wait_until(dispatcherWakeupCV, sleepUntil);
}
2015-08-10 09:26:30 +00:00
nrDispatcherWakeups++;
*dispatcherWakeup_ = false;
2015-08-10 09:26:30 +00:00
}
}
2015-07-21 13:14:17 +00:00
2015-08-10 09:26:30 +00:00
printMsg(lvlError, "dispatcher exits");
}
2015-08-10 09:26:30 +00:00
system_time State::doDispatch()
{
/* Start steps until we're out of steps or slots. */
2015-08-10 09:26:30 +00:00
auto sleepUntil = system_time::max();
bool keepGoing;
do {
system_time now = std::chrono::system_clock::now();
/* Copy the currentJobs field of each machine. This is
necessary to ensure that the sort comparator below is
an ordering. std::sort() can segfault if it isn't. Also
filter out temporarily disabled machines. */
struct MachineInfo
{
Machine::ptr machine;
unsigned int currentJobs;
};
std::vector<MachineInfo> machinesSorted;
{
auto machines_(machines.lock());
for (auto & m : *machines_) {
auto info(m.second->state->connectInfo.lock());
if (info->consecutiveFailures && info->disabledUntil > now) {
if (info->disabledUntil < sleepUntil)
sleepUntil = info->disabledUntil;
continue;
}
2015-08-10 09:26:30 +00:00
machinesSorted.push_back({m.second, m.second->state->currentJobs});
2015-07-21 13:14:17 +00:00
}
2015-08-10 09:26:30 +00:00
}
2015-07-21 13:14:17 +00:00
2015-08-10 09:26:30 +00:00
/* Sort the machines by a combination of speed factor and
available slots. Prioritise the available machines as
follows:
2015-07-21 13:14:17 +00:00
2015-08-10 09:26:30 +00:00
- First by load divided by speed factor, rounded to the
nearest integer. This causes fast machines to be
preferred over slow machines with similar loads.
2015-07-21 13:14:17 +00:00
2015-08-10 09:26:30 +00:00
- Then by speed factor.
2015-07-21 13:14:17 +00:00
2015-08-10 09:26:30 +00:00
- Finally by load. */
sort(machinesSorted.begin(), machinesSorted.end(),
[](const MachineInfo & a, const MachineInfo & b) -> bool
{
float ta = roundf(a.currentJobs / a.machine->speedFactor);
float tb = roundf(b.currentJobs / b.machine->speedFactor);
return
ta != tb ? ta < tb :
a.machine->speedFactor != b.machine->speedFactor ? a.machine->speedFactor > b.machine->speedFactor :
a.currentJobs > b.currentJobs;
});
/* Sort the runnable steps by priority. FIXME: O(n lg n);
obviously, it would be better to keep a runnable queue sorted
by priority. */
std::vector<Step::ptr> runnableSorted;
{
2015-08-10 09:26:30 +00:00
auto runnable_(runnable.lock());
runnableSorted.reserve(runnable_->size());
2015-08-10 09:26:30 +00:00
for (auto i = runnable_->begin(); i != runnable_->end(); ) {
auto step = i->lock();
/* Remove dead steps. */
2015-08-10 09:26:30 +00:00
if (!step) {
i = runnable_->erase(i);
continue;
}
2015-07-21 13:14:17 +00:00
++i;
2015-08-10 09:26:30 +00:00
/* Skip previously failed steps that aren't ready
to be retried. */
{
auto step_(step->state.lock());
if (step_->tries > 0 && step_->after > now) {
if (step_->after < sleepUntil)
sleepUntil = step_->after;
2015-07-21 13:14:17 +00:00
continue;
}
2015-08-10 09:26:30 +00:00
}
2015-07-21 13:14:17 +00:00
runnableSorted.push_back(step);
}
}
sort(runnableSorted.begin(), runnableSorted.end(),
[](const Step::ptr & a, const Step::ptr & b)
{
auto a_(a->state.lock());
auto b_(b->state.lock()); // FIXME: deadlock?
return a_->lowestBuildID < b_->lowestBuildID;
});
/* Find a machine with a free slot and find a step to run
on it. Once we find such a pair, we restart the outer
loop because the machine sorting will have changed. */
keepGoing = false;
for (auto & mi : machinesSorted) {
if (mi.machine->state->currentJobs >= mi.machine->maxJobs) continue;
for (auto & step : runnableSorted) {
/* Can this machine do this step? */
if (!mi.machine->supportsStep(step)) continue;
/* Let's do this step. Remove it from the runnable
list. FIXME: O(n). */
{
auto runnable_(runnable.lock());
bool removed = false;
for (auto i = runnable_->begin(); i != runnable_->end(); )
if (i->lock() == step) {
i = runnable_->erase(i);
removed = true;
break;
} else ++i;
assert(removed);
}
2015-08-10 09:26:30 +00:00
/* Make a slot reservation and start a thread to
do the build. */
auto reservation = std::make_shared<MaintainCount>(mi.machine->state->currentJobs);
2015-07-21 13:14:17 +00:00
2015-08-10 09:26:30 +00:00
auto builderThread = std::thread(&State::builder, this, step, mi.machine, reservation);
builderThread.detach(); // FIXME?
2015-07-21 13:14:17 +00:00
2015-08-10 09:26:30 +00:00
keepGoing = true;
break;
2015-07-21 13:14:17 +00:00
}
2015-08-10 09:26:30 +00:00
if (keepGoing) break;
2015-07-21 13:14:17 +00:00
}
2015-08-10 09:26:30 +00:00
} while (keepGoing);
return sleepUntil;
2015-07-21 13:14:17 +00:00
}
void State::wakeDispatcher()
{
{
auto dispatcherWakeup_(dispatcherWakeup.lock());
*dispatcherWakeup_ = true;
}
dispatcherWakeupCV.notify_one();
2015-07-21 13:14:17 +00:00
}