forked from lix-project/hydra
Merge pull request #1301 from delroth/queue-runner-perf
queue-runner: only re-sort runnables by prio once per dispatch cycle
This commit is contained in:
commit
874fcae1e8
1 changed files with 106 additions and 99 deletions
|
@ -85,12 +85,113 @@ system_time State::doDispatch()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
system_time now = std::chrono::system_clock::now();
|
||||||
|
|
||||||
/* Start steps until we're out of steps or slots. */
|
/* Start steps until we're out of steps or slots. */
|
||||||
auto sleepUntil = system_time::max();
|
auto sleepUntil = system_time::max();
|
||||||
bool keepGoing;
|
bool keepGoing;
|
||||||
|
|
||||||
|
/* Sort the runnable steps by priority. Priority is establised
|
||||||
|
as follows (in order of precedence):
|
||||||
|
|
||||||
|
- The global priority of the builds that depend on the
|
||||||
|
step. This allows admins to bump a build to the front of
|
||||||
|
the queue.
|
||||||
|
|
||||||
|
- The lowest used scheduling share of the jobsets depending
|
||||||
|
on the step.
|
||||||
|
|
||||||
|
- The local priority of the build, as set via the build's
|
||||||
|
meta.schedulingPriority field. Note that this is not
|
||||||
|
quite correct: the local priority should only be used to
|
||||||
|
establish priority between builds in the same jobset, but
|
||||||
|
here it's used between steps in different jobsets if they
|
||||||
|
happen to have the same lowest used scheduling share. But
|
||||||
|
that's not very likely.
|
||||||
|
|
||||||
|
- The lowest ID of the builds depending on the step;
|
||||||
|
i.e. older builds take priority over new ones.
|
||||||
|
|
||||||
|
FIXME: O(n lg n); obviously, it would be better to keep a
|
||||||
|
runnable queue sorted by priority. */
|
||||||
|
struct StepInfo
|
||||||
|
{
|
||||||
|
Step::ptr step;
|
||||||
|
bool alreadyScheduled = false;
|
||||||
|
|
||||||
|
/* The lowest share used of any jobset depending on this
|
||||||
|
step. */
|
||||||
|
double lowestShareUsed = 1e9;
|
||||||
|
|
||||||
|
/* Info copied from step->state to ensure that the
|
||||||
|
comparator is a partial ordering (see MachineInfo). */
|
||||||
|
int highestGlobalPriority;
|
||||||
|
int highestLocalPriority;
|
||||||
|
BuildID lowestBuildID;
|
||||||
|
|
||||||
|
StepInfo(Step::ptr step, Step::State & step_) : step(step)
|
||||||
|
{
|
||||||
|
for (auto & jobset : step_.jobsets)
|
||||||
|
lowestShareUsed = std::min(lowestShareUsed, jobset->shareUsed());
|
||||||
|
highestGlobalPriority = step_.highestGlobalPriority;
|
||||||
|
highestLocalPriority = step_.highestLocalPriority;
|
||||||
|
lowestBuildID = step_.lowestBuildID;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
std::vector<StepInfo> runnableSorted;
|
||||||
|
|
||||||
|
struct RunnablePerType
|
||||||
|
{
|
||||||
|
unsigned int count{0};
|
||||||
|
std::chrono::seconds waitTime{0};
|
||||||
|
};
|
||||||
|
|
||||||
|
std::unordered_map<std::string, RunnablePerType> runnablePerType;
|
||||||
|
|
||||||
|
{
|
||||||
|
auto runnable_(runnable.lock());
|
||||||
|
runnableSorted.reserve(runnable_->size());
|
||||||
|
for (auto i = runnable_->begin(); i != runnable_->end(); ) {
|
||||||
|
auto step = i->lock();
|
||||||
|
|
||||||
|
/* Remove dead steps. */
|
||||||
|
if (!step) {
|
||||||
|
i = runnable_->erase(i);
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
++i;
|
||||||
|
|
||||||
|
auto & r = runnablePerType[step->systemType];
|
||||||
|
r.count++;
|
||||||
|
|
||||||
|
/* Skip previously failed steps that aren't ready
|
||||||
|
to be retried. */
|
||||||
|
auto step_(step->state.lock());
|
||||||
|
r.waitTime += std::chrono::duration_cast<std::chrono::seconds>(now - step_->runnableSince);
|
||||||
|
if (step_->tries > 0 && step_->after > now) {
|
||||||
|
if (step_->after < sleepUntil)
|
||||||
|
sleepUntil = step_->after;
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
runnableSorted.emplace_back(step, *step_);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
sort(runnableSorted.begin(), runnableSorted.end(),
|
||||||
|
[](const StepInfo & a, const StepInfo & b)
|
||||||
|
{
|
||||||
|
return
|
||||||
|
a.highestGlobalPriority != b.highestGlobalPriority ? a.highestGlobalPriority > b.highestGlobalPriority :
|
||||||
|
a.lowestShareUsed != b.lowestShareUsed ? a.lowestShareUsed < b.lowestShareUsed :
|
||||||
|
a.highestLocalPriority != b.highestLocalPriority ? a.highestLocalPriority > b.highestLocalPriority :
|
||||||
|
a.lowestBuildID < b.lowestBuildID;
|
||||||
|
});
|
||||||
|
|
||||||
do {
|
do {
|
||||||
system_time now = std::chrono::system_clock::now();
|
now = std::chrono::system_clock::now();
|
||||||
|
|
||||||
/* Copy the currentJobs field of each machine. This is
|
/* Copy the currentJobs field of each machine. This is
|
||||||
necessary to ensure that the sort comparator below is
|
necessary to ensure that the sort comparator below is
|
||||||
|
@ -138,104 +239,6 @@ system_time State::doDispatch()
|
||||||
a.currentJobs > b.currentJobs;
|
a.currentJobs > b.currentJobs;
|
||||||
});
|
});
|
||||||
|
|
||||||
/* Sort the runnable steps by priority. Priority is establised
|
|
||||||
as follows (in order of precedence):
|
|
||||||
|
|
||||||
- The global priority of the builds that depend on the
|
|
||||||
step. This allows admins to bump a build to the front of
|
|
||||||
the queue.
|
|
||||||
|
|
||||||
- The lowest used scheduling share of the jobsets depending
|
|
||||||
on the step.
|
|
||||||
|
|
||||||
- The local priority of the build, as set via the build's
|
|
||||||
meta.schedulingPriority field. Note that this is not
|
|
||||||
quite correct: the local priority should only be used to
|
|
||||||
establish priority between builds in the same jobset, but
|
|
||||||
here it's used between steps in different jobsets if they
|
|
||||||
happen to have the same lowest used scheduling share. But
|
|
||||||
that's not very likely.
|
|
||||||
|
|
||||||
- The lowest ID of the builds depending on the step;
|
|
||||||
i.e. older builds take priority over new ones.
|
|
||||||
|
|
||||||
FIXME: O(n lg n); obviously, it would be better to keep a
|
|
||||||
runnable queue sorted by priority. */
|
|
||||||
struct StepInfo
|
|
||||||
{
|
|
||||||
Step::ptr step;
|
|
||||||
|
|
||||||
/* The lowest share used of any jobset depending on this
|
|
||||||
step. */
|
|
||||||
double lowestShareUsed = 1e9;
|
|
||||||
|
|
||||||
/* Info copied from step->state to ensure that the
|
|
||||||
comparator is a partial ordering (see MachineInfo). */
|
|
||||||
int highestGlobalPriority;
|
|
||||||
int highestLocalPriority;
|
|
||||||
BuildID lowestBuildID;
|
|
||||||
|
|
||||||
StepInfo(Step::ptr step, Step::State & step_) : step(step)
|
|
||||||
{
|
|
||||||
for (auto & jobset : step_.jobsets)
|
|
||||||
lowestShareUsed = std::min(lowestShareUsed, jobset->shareUsed());
|
|
||||||
highestGlobalPriority = step_.highestGlobalPriority;
|
|
||||||
highestLocalPriority = step_.highestLocalPriority;
|
|
||||||
lowestBuildID = step_.lowestBuildID;
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
std::vector<StepInfo> runnableSorted;
|
|
||||||
|
|
||||||
struct RunnablePerType
|
|
||||||
{
|
|
||||||
unsigned int count{0};
|
|
||||||
std::chrono::seconds waitTime{0};
|
|
||||||
};
|
|
||||||
|
|
||||||
std::unordered_map<std::string, RunnablePerType> runnablePerType;
|
|
||||||
|
|
||||||
{
|
|
||||||
auto runnable_(runnable.lock());
|
|
||||||
runnableSorted.reserve(runnable_->size());
|
|
||||||
for (auto i = runnable_->begin(); i != runnable_->end(); ) {
|
|
||||||
auto step = i->lock();
|
|
||||||
|
|
||||||
/* Remove dead steps. */
|
|
||||||
if (!step) {
|
|
||||||
i = runnable_->erase(i);
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
++i;
|
|
||||||
|
|
||||||
auto & r = runnablePerType[step->systemType];
|
|
||||||
r.count++;
|
|
||||||
|
|
||||||
/* Skip previously failed steps that aren't ready
|
|
||||||
to be retried. */
|
|
||||||
auto step_(step->state.lock());
|
|
||||||
r.waitTime += std::chrono::duration_cast<std::chrono::seconds>(now - step_->runnableSince);
|
|
||||||
if (step_->tries > 0 && step_->after > now) {
|
|
||||||
if (step_->after < sleepUntil)
|
|
||||||
sleepUntil = step_->after;
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
runnableSorted.emplace_back(step, *step_);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
sort(runnableSorted.begin(), runnableSorted.end(),
|
|
||||||
[](const StepInfo & a, const StepInfo & b)
|
|
||||||
{
|
|
||||||
return
|
|
||||||
a.highestGlobalPriority != b.highestGlobalPriority ? a.highestGlobalPriority > b.highestGlobalPriority :
|
|
||||||
a.lowestShareUsed != b.lowestShareUsed ? a.lowestShareUsed < b.lowestShareUsed :
|
|
||||||
a.highestLocalPriority != b.highestLocalPriority ? a.highestLocalPriority > b.highestLocalPriority :
|
|
||||||
a.lowestBuildID < b.lowestBuildID;
|
|
||||||
});
|
|
||||||
|
|
||||||
/* Find a machine with a free slot and find a step to run
|
/* Find a machine with a free slot and find a step to run
|
||||||
on it. Once we find such a pair, we restart the outer
|
on it. Once we find such a pair, we restart the outer
|
||||||
loop because the machine sorting will have changed. */
|
loop because the machine sorting will have changed. */
|
||||||
|
@ -245,6 +248,8 @@ system_time State::doDispatch()
|
||||||
if (mi.machine->state->currentJobs >= mi.machine->maxJobs) continue;
|
if (mi.machine->state->currentJobs >= mi.machine->maxJobs) continue;
|
||||||
|
|
||||||
for (auto & stepInfo : runnableSorted) {
|
for (auto & stepInfo : runnableSorted) {
|
||||||
|
if (stepInfo.alreadyScheduled) continue;
|
||||||
|
|
||||||
auto & step(stepInfo.step);
|
auto & step(stepInfo.step);
|
||||||
|
|
||||||
/* Can this machine do this step? */
|
/* Can this machine do this step? */
|
||||||
|
@ -271,6 +276,8 @@ system_time State::doDispatch()
|
||||||
r.count--;
|
r.count--;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
stepInfo.alreadyScheduled = true;
|
||||||
|
|
||||||
/* Make a slot reservation and start a thread to
|
/* Make a slot reservation and start a thread to
|
||||||
do the build. */
|
do the build. */
|
||||||
auto builderThread = std::thread(&State::builder, this,
|
auto builderThread = std::thread(&State::builder, this,
|
||||||
|
|
Loading…
Reference in a new issue