queue-runner: only re-sort runnables by prio once per dispatch cycle

The previous implementation was O(N²lg(N)) due to sorting the full
runnables priority list once per runnable being scheduled. While not
confirmed, this is suspected to cause performance issues and
bottlenecking with the queue runner when the runnable list gets large
enough.

This commit changes the dispatcher to instead only sort runnables per
priority once per dispatch cycle. This has the drawback of being less
reactive to runnable priority changes: the previous code would react
immediately, while this might end up using "old" priorities until the
next dispatch cycle. However, dispatch cycles are not supposed to take
very long (seconds, not minutes/hours), so this is not expected to have
much or any practical impact.

Ideally runnables would be maintained in a sorted data structure instead
of the current approach of copying + sorting in the scheduler. This
would however be a much more invasive change to implement, and might
have to wait until we can confirm where the queue runner bottlenecks
actually lie.
This commit is contained in:
Pierre Bourdon 2023-09-08 23:38:30 +02:00
parent 00d30874da
commit b7c864c515
Signed by: delroth
GPG key ID: 6FB80DCD84DA0F1C

View file

@ -85,59 +85,12 @@ system_time State::doDispatch()
} }
} }
system_time now = std::chrono::system_clock::now();
/* Start steps until we're out of steps or slots. */ /* Start steps until we're out of steps or slots. */
auto sleepUntil = system_time::max(); auto sleepUntil = system_time::max();
bool keepGoing; bool keepGoing;
do {
system_time now = std::chrono::system_clock::now();
/* Copy the currentJobs field of each machine. This is
necessary to ensure that the sort comparator below is
an ordering. std::sort() can segfault if it isn't. Also
filter out temporarily disabled machines. */
struct MachineInfo
{
Machine::ptr machine;
unsigned long currentJobs;
};
std::vector<MachineInfo> machinesSorted;
{
auto machines_(machines.lock());
for (auto & m : *machines_) {
auto info(m.second->state->connectInfo.lock());
if (!m.second->enabled) continue;
if (info->consecutiveFailures && info->disabledUntil > now) {
if (info->disabledUntil < sleepUntil)
sleepUntil = info->disabledUntil;
continue;
}
machinesSorted.push_back({m.second, m.second->state->currentJobs});
}
}
/* Sort the machines by a combination of speed factor and
available slots. Prioritise the available machines as
follows:
- First by load divided by speed factor, rounded to the
nearest integer. This causes fast machines to be
preferred over slow machines with similar loads.
- Then by speed factor.
- Finally by load. */
sort(machinesSorted.begin(), machinesSorted.end(),
[](const MachineInfo & a, const MachineInfo & b) -> bool
{
float ta = std::round(a.currentJobs / a.machine->speedFactor);
float tb = std::round(b.currentJobs / b.machine->speedFactor);
return
ta != tb ? ta < tb :
a.machine->speedFactor != b.machine->speedFactor ? a.machine->speedFactor > b.machine->speedFactor :
a.currentJobs > b.currentJobs;
});
/* Sort the runnable steps by priority. Priority is establised /* Sort the runnable steps by priority. Priority is establised
as follows (in order of precedence): as follows (in order of precedence):
@ -164,6 +117,7 @@ system_time State::doDispatch()
struct StepInfo struct StepInfo
{ {
Step::ptr step; Step::ptr step;
bool alreadyScheduled = false;
/* The lowest share used of any jobset depending on this /* The lowest share used of any jobset depending on this
step. */ step. */
@ -236,6 +190,55 @@ system_time State::doDispatch()
a.lowestBuildID < b.lowestBuildID; a.lowestBuildID < b.lowestBuildID;
}); });
do {
now = std::chrono::system_clock::now();
/* Copy the currentJobs field of each machine. This is
necessary to ensure that the sort comparator below is
an ordering. std::sort() can segfault if it isn't. Also
filter out temporarily disabled machines. */
struct MachineInfo
{
Machine::ptr machine;
unsigned long currentJobs;
};
std::vector<MachineInfo> machinesSorted;
{
auto machines_(machines.lock());
for (auto & m : *machines_) {
auto info(m.second->state->connectInfo.lock());
if (!m.second->enabled) continue;
if (info->consecutiveFailures && info->disabledUntil > now) {
if (info->disabledUntil < sleepUntil)
sleepUntil = info->disabledUntil;
continue;
}
machinesSorted.push_back({m.second, m.second->state->currentJobs});
}
}
/* Sort the machines by a combination of speed factor and
available slots. Prioritise the available machines as
follows:
- First by load divided by speed factor, rounded to the
nearest integer. This causes fast machines to be
preferred over slow machines with similar loads.
- Then by speed factor.
- Finally by load. */
sort(machinesSorted.begin(), machinesSorted.end(),
[](const MachineInfo & a, const MachineInfo & b) -> bool
{
float ta = std::round(a.currentJobs / a.machine->speedFactor);
float tb = std::round(b.currentJobs / b.machine->speedFactor);
return
ta != tb ? ta < tb :
a.machine->speedFactor != b.machine->speedFactor ? a.machine->speedFactor > b.machine->speedFactor :
a.currentJobs > b.currentJobs;
});
/* Find a machine with a free slot and find a step to run /* Find a machine with a free slot and find a step to run
on it. Once we find such a pair, we restart the outer on it. Once we find such a pair, we restart the outer
loop because the machine sorting will have changed. */ loop because the machine sorting will have changed. */
@ -245,6 +248,8 @@ system_time State::doDispatch()
if (mi.machine->state->currentJobs >= mi.machine->maxJobs) continue; if (mi.machine->state->currentJobs >= mi.machine->maxJobs) continue;
for (auto & stepInfo : runnableSorted) { for (auto & stepInfo : runnableSorted) {
if (stepInfo.alreadyScheduled) continue;
auto & step(stepInfo.step); auto & step(stepInfo.step);
/* Can this machine do this step? */ /* Can this machine do this step? */
@ -271,6 +276,8 @@ system_time State::doDispatch()
r.count--; r.count--;
} }
stepInfo.alreadyScheduled = true;
/* Make a slot reservation and start a thread to /* Make a slot reservation and start a thread to
do the build. */ do the build. */
auto builderThread = std::thread(&State::builder, this, auto builderThread = std::thread(&State::builder, this,