From b7c864c515faee58a11a67ea8619e464e883ad34 Mon Sep 17 00:00:00 2001 From: Pierre Bourdon Date: Fri, 8 Sep 2023 23:38:30 +0200 Subject: [PATCH] queue-runner: only re-sort runnables by prio once per dispatch cycle MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The previous implementation was O(N²lg(N)) due to sorting the full runnables priority list once per runnable being scheduled. While not confirmed, this is suspected to cause performance issues and bottlenecking with the queue runner when the runnable list gets large enough. This commit changes the dispatcher to instead only sort runnables per priority once per dispatch cycle. This has the drawback of being less reactive to runnable priority changes: the previous code would react immediately, while this might end up using "old" priorities until the next dispatch cycle. However, dispatch cycles are not supposed to take very long (seconds, not minutes/hours), so this is not expected to have much or any practical impact. Ideally runnables would be maintained in a sorted data structure instead of the current approach of copying + sorting in the scheduler. This would however be a much more invasive change to implement, and might have to wait until we can confirm where the queue runner bottlenecks actually lie. --- src/hydra-queue-runner/dispatcher.cc | 205 ++++++++++++++------------- 1 file changed, 106 insertions(+), 99 deletions(-) diff --git a/src/hydra-queue-runner/dispatcher.cc b/src/hydra-queue-runner/dispatcher.cc index 1e40fa69..af21dcbd 100644 --- a/src/hydra-queue-runner/dispatcher.cc +++ b/src/hydra-queue-runner/dispatcher.cc @@ -85,12 +85,113 @@ system_time State::doDispatch() } } + system_time now = std::chrono::system_clock::now(); + /* Start steps until we're out of steps or slots. */ auto sleepUntil = system_time::max(); bool keepGoing; + /* Sort the runnable steps by priority. Priority is establised + as follows (in order of precedence): + + - The global priority of the builds that depend on the + step. This allows admins to bump a build to the front of + the queue. + + - The lowest used scheduling share of the jobsets depending + on the step. + + - The local priority of the build, as set via the build's + meta.schedulingPriority field. Note that this is not + quite correct: the local priority should only be used to + establish priority between builds in the same jobset, but + here it's used between steps in different jobsets if they + happen to have the same lowest used scheduling share. But + that's not very likely. + + - The lowest ID of the builds depending on the step; + i.e. older builds take priority over new ones. + + FIXME: O(n lg n); obviously, it would be better to keep a + runnable queue sorted by priority. */ + struct StepInfo + { + Step::ptr step; + bool alreadyScheduled = false; + + /* The lowest share used of any jobset depending on this + step. */ + double lowestShareUsed = 1e9; + + /* Info copied from step->state to ensure that the + comparator is a partial ordering (see MachineInfo). */ + int highestGlobalPriority; + int highestLocalPriority; + BuildID lowestBuildID; + + StepInfo(Step::ptr step, Step::State & step_) : step(step) + { + for (auto & jobset : step_.jobsets) + lowestShareUsed = std::min(lowestShareUsed, jobset->shareUsed()); + highestGlobalPriority = step_.highestGlobalPriority; + highestLocalPriority = step_.highestLocalPriority; + lowestBuildID = step_.lowestBuildID; + } + }; + + std::vector runnableSorted; + + struct RunnablePerType + { + unsigned int count{0}; + std::chrono::seconds waitTime{0}; + }; + + std::unordered_map runnablePerType; + + { + auto runnable_(runnable.lock()); + runnableSorted.reserve(runnable_->size()); + for (auto i = runnable_->begin(); i != runnable_->end(); ) { + auto step = i->lock(); + + /* Remove dead steps. */ + if (!step) { + i = runnable_->erase(i); + continue; + } + + ++i; + + auto & r = runnablePerType[step->systemType]; + r.count++; + + /* Skip previously failed steps that aren't ready + to be retried. */ + auto step_(step->state.lock()); + r.waitTime += std::chrono::duration_cast(now - step_->runnableSince); + if (step_->tries > 0 && step_->after > now) { + if (step_->after < sleepUntil) + sleepUntil = step_->after; + continue; + } + + runnableSorted.emplace_back(step, *step_); + } + } + + sort(runnableSorted.begin(), runnableSorted.end(), + [](const StepInfo & a, const StepInfo & b) + { + return + a.highestGlobalPriority != b.highestGlobalPriority ? a.highestGlobalPriority > b.highestGlobalPriority : + a.lowestShareUsed != b.lowestShareUsed ? a.lowestShareUsed < b.lowestShareUsed : + a.highestLocalPriority != b.highestLocalPriority ? a.highestLocalPriority > b.highestLocalPriority : + a.lowestBuildID < b.lowestBuildID; + }); + do { - system_time now = std::chrono::system_clock::now(); + now = std::chrono::system_clock::now(); /* Copy the currentJobs field of each machine. This is necessary to ensure that the sort comparator below is @@ -138,104 +239,6 @@ system_time State::doDispatch() a.currentJobs > b.currentJobs; }); - /* Sort the runnable steps by priority. Priority is establised - as follows (in order of precedence): - - - The global priority of the builds that depend on the - step. This allows admins to bump a build to the front of - the queue. - - - The lowest used scheduling share of the jobsets depending - on the step. - - - The local priority of the build, as set via the build's - meta.schedulingPriority field. Note that this is not - quite correct: the local priority should only be used to - establish priority between builds in the same jobset, but - here it's used between steps in different jobsets if they - happen to have the same lowest used scheduling share. But - that's not very likely. - - - The lowest ID of the builds depending on the step; - i.e. older builds take priority over new ones. - - FIXME: O(n lg n); obviously, it would be better to keep a - runnable queue sorted by priority. */ - struct StepInfo - { - Step::ptr step; - - /* The lowest share used of any jobset depending on this - step. */ - double lowestShareUsed = 1e9; - - /* Info copied from step->state to ensure that the - comparator is a partial ordering (see MachineInfo). */ - int highestGlobalPriority; - int highestLocalPriority; - BuildID lowestBuildID; - - StepInfo(Step::ptr step, Step::State & step_) : step(step) - { - for (auto & jobset : step_.jobsets) - lowestShareUsed = std::min(lowestShareUsed, jobset->shareUsed()); - highestGlobalPriority = step_.highestGlobalPriority; - highestLocalPriority = step_.highestLocalPriority; - lowestBuildID = step_.lowestBuildID; - } - }; - - std::vector runnableSorted; - - struct RunnablePerType - { - unsigned int count{0}; - std::chrono::seconds waitTime{0}; - }; - - std::unordered_map runnablePerType; - - { - auto runnable_(runnable.lock()); - runnableSorted.reserve(runnable_->size()); - for (auto i = runnable_->begin(); i != runnable_->end(); ) { - auto step = i->lock(); - - /* Remove dead steps. */ - if (!step) { - i = runnable_->erase(i); - continue; - } - - ++i; - - auto & r = runnablePerType[step->systemType]; - r.count++; - - /* Skip previously failed steps that aren't ready - to be retried. */ - auto step_(step->state.lock()); - r.waitTime += std::chrono::duration_cast(now - step_->runnableSince); - if (step_->tries > 0 && step_->after > now) { - if (step_->after < sleepUntil) - sleepUntil = step_->after; - continue; - } - - runnableSorted.emplace_back(step, *step_); - } - } - - sort(runnableSorted.begin(), runnableSorted.end(), - [](const StepInfo & a, const StepInfo & b) - { - return - a.highestGlobalPriority != b.highestGlobalPriority ? a.highestGlobalPriority > b.highestGlobalPriority : - a.lowestShareUsed != b.lowestShareUsed ? a.lowestShareUsed < b.lowestShareUsed : - a.highestLocalPriority != b.highestLocalPriority ? a.highestLocalPriority > b.highestLocalPriority : - a.lowestBuildID < b.lowestBuildID; - }); - /* Find a machine with a free slot and find a step to run on it. Once we find such a pair, we restart the outer loop because the machine sorting will have changed. */ @@ -245,6 +248,8 @@ system_time State::doDispatch() if (mi.machine->state->currentJobs >= mi.machine->maxJobs) continue; for (auto & stepInfo : runnableSorted) { + if (stepInfo.alreadyScheduled) continue; + auto & step(stepInfo.step); /* Can this machine do this step? */ @@ -271,6 +276,8 @@ system_time State::doDispatch() r.count--; } + stepInfo.alreadyScheduled = true; + /* Make a slot reservation and start a thread to do the build. */ auto builderThread = std::thread(&State::builder, this,