diff --git a/src/hydra-queue-runner/dispatcher.cc b/src/hydra-queue-runner/dispatcher.cc index 1e40fa69..af21dcbd 100644 --- a/src/hydra-queue-runner/dispatcher.cc +++ b/src/hydra-queue-runner/dispatcher.cc @@ -85,12 +85,113 @@ system_time State::doDispatch() } } + system_time now = std::chrono::system_clock::now(); + /* Start steps until we're out of steps or slots. */ auto sleepUntil = system_time::max(); bool keepGoing; + /* Sort the runnable steps by priority. Priority is establised + as follows (in order of precedence): + + - The global priority of the builds that depend on the + step. This allows admins to bump a build to the front of + the queue. + + - The lowest used scheduling share of the jobsets depending + on the step. + + - The local priority of the build, as set via the build's + meta.schedulingPriority field. Note that this is not + quite correct: the local priority should only be used to + establish priority between builds in the same jobset, but + here it's used between steps in different jobsets if they + happen to have the same lowest used scheduling share. But + that's not very likely. + + - The lowest ID of the builds depending on the step; + i.e. older builds take priority over new ones. + + FIXME: O(n lg n); obviously, it would be better to keep a + runnable queue sorted by priority. */ + struct StepInfo + { + Step::ptr step; + bool alreadyScheduled = false; + + /* The lowest share used of any jobset depending on this + step. */ + double lowestShareUsed = 1e9; + + /* Info copied from step->state to ensure that the + comparator is a partial ordering (see MachineInfo). */ + int highestGlobalPriority; + int highestLocalPriority; + BuildID lowestBuildID; + + StepInfo(Step::ptr step, Step::State & step_) : step(step) + { + for (auto & jobset : step_.jobsets) + lowestShareUsed = std::min(lowestShareUsed, jobset->shareUsed()); + highestGlobalPriority = step_.highestGlobalPriority; + highestLocalPriority = step_.highestLocalPriority; + lowestBuildID = step_.lowestBuildID; + } + }; + + std::vector runnableSorted; + + struct RunnablePerType + { + unsigned int count{0}; + std::chrono::seconds waitTime{0}; + }; + + std::unordered_map runnablePerType; + + { + auto runnable_(runnable.lock()); + runnableSorted.reserve(runnable_->size()); + for (auto i = runnable_->begin(); i != runnable_->end(); ) { + auto step = i->lock(); + + /* Remove dead steps. */ + if (!step) { + i = runnable_->erase(i); + continue; + } + + ++i; + + auto & r = runnablePerType[step->systemType]; + r.count++; + + /* Skip previously failed steps that aren't ready + to be retried. */ + auto step_(step->state.lock()); + r.waitTime += std::chrono::duration_cast(now - step_->runnableSince); + if (step_->tries > 0 && step_->after > now) { + if (step_->after < sleepUntil) + sleepUntil = step_->after; + continue; + } + + runnableSorted.emplace_back(step, *step_); + } + } + + sort(runnableSorted.begin(), runnableSorted.end(), + [](const StepInfo & a, const StepInfo & b) + { + return + a.highestGlobalPriority != b.highestGlobalPriority ? a.highestGlobalPriority > b.highestGlobalPriority : + a.lowestShareUsed != b.lowestShareUsed ? a.lowestShareUsed < b.lowestShareUsed : + a.highestLocalPriority != b.highestLocalPriority ? a.highestLocalPriority > b.highestLocalPriority : + a.lowestBuildID < b.lowestBuildID; + }); + do { - system_time now = std::chrono::system_clock::now(); + now = std::chrono::system_clock::now(); /* Copy the currentJobs field of each machine. This is necessary to ensure that the sort comparator below is @@ -138,104 +239,6 @@ system_time State::doDispatch() a.currentJobs > b.currentJobs; }); - /* Sort the runnable steps by priority. Priority is establised - as follows (in order of precedence): - - - The global priority of the builds that depend on the - step. This allows admins to bump a build to the front of - the queue. - - - The lowest used scheduling share of the jobsets depending - on the step. - - - The local priority of the build, as set via the build's - meta.schedulingPriority field. Note that this is not - quite correct: the local priority should only be used to - establish priority between builds in the same jobset, but - here it's used between steps in different jobsets if they - happen to have the same lowest used scheduling share. But - that's not very likely. - - - The lowest ID of the builds depending on the step; - i.e. older builds take priority over new ones. - - FIXME: O(n lg n); obviously, it would be better to keep a - runnable queue sorted by priority. */ - struct StepInfo - { - Step::ptr step; - - /* The lowest share used of any jobset depending on this - step. */ - double lowestShareUsed = 1e9; - - /* Info copied from step->state to ensure that the - comparator is a partial ordering (see MachineInfo). */ - int highestGlobalPriority; - int highestLocalPriority; - BuildID lowestBuildID; - - StepInfo(Step::ptr step, Step::State & step_) : step(step) - { - for (auto & jobset : step_.jobsets) - lowestShareUsed = std::min(lowestShareUsed, jobset->shareUsed()); - highestGlobalPriority = step_.highestGlobalPriority; - highestLocalPriority = step_.highestLocalPriority; - lowestBuildID = step_.lowestBuildID; - } - }; - - std::vector runnableSorted; - - struct RunnablePerType - { - unsigned int count{0}; - std::chrono::seconds waitTime{0}; - }; - - std::unordered_map runnablePerType; - - { - auto runnable_(runnable.lock()); - runnableSorted.reserve(runnable_->size()); - for (auto i = runnable_->begin(); i != runnable_->end(); ) { - auto step = i->lock(); - - /* Remove dead steps. */ - if (!step) { - i = runnable_->erase(i); - continue; - } - - ++i; - - auto & r = runnablePerType[step->systemType]; - r.count++; - - /* Skip previously failed steps that aren't ready - to be retried. */ - auto step_(step->state.lock()); - r.waitTime += std::chrono::duration_cast(now - step_->runnableSince); - if (step_->tries > 0 && step_->after > now) { - if (step_->after < sleepUntil) - sleepUntil = step_->after; - continue; - } - - runnableSorted.emplace_back(step, *step_); - } - } - - sort(runnableSorted.begin(), runnableSorted.end(), - [](const StepInfo & a, const StepInfo & b) - { - return - a.highestGlobalPriority != b.highestGlobalPriority ? a.highestGlobalPriority > b.highestGlobalPriority : - a.lowestShareUsed != b.lowestShareUsed ? a.lowestShareUsed < b.lowestShareUsed : - a.highestLocalPriority != b.highestLocalPriority ? a.highestLocalPriority > b.highestLocalPriority : - a.lowestBuildID < b.lowestBuildID; - }); - /* Find a machine with a free slot and find a step to run on it. Once we find such a pair, we restart the outer loop because the machine sorting will have changed. */ @@ -245,6 +248,8 @@ system_time State::doDispatch() if (mi.machine->state->currentJobs >= mi.machine->maxJobs) continue; for (auto & stepInfo : runnableSorted) { + if (stepInfo.alreadyScheduled) continue; + auto & step(stepInfo.step); /* Can this machine do this step? */ @@ -271,6 +276,8 @@ system_time State::doDispatch() r.count--; } + stepInfo.alreadyScheduled = true; + /* Make a slot reservation and start a thread to do the build. */ auto builderThread = std::thread(&State::builder, this,