Revive jobset scheduling

(I.e. taking the jobset scheduling share into account.)
This commit is contained in:
Eelco Dolstra 2015-08-11 01:30:24 +02:00
parent 08739a2a5a
commit 97f11baa8d
5 changed files with 154 additions and 4 deletions

View file

@ -131,6 +131,16 @@ bool State::doBuildStep(std::shared_ptr<StoreAPI> store, Step::ptr step,
time_t stepStopTime = time(0);
if (!result.stopTime) result.stopTime = stepStopTime;
/* Account the time we spent building this step by dividing it
among the jobsets that depend on it. */
{
auto step_(step->state.lock());
// FIXME: loss of precision.
time_t charge = (result.stopTime - result.startTime) / step_->jobsets.size();
for (auto & jobset : step_->jobsets)
jobset->addStep(result.startTime, charge);
}
/* Asynchronously compress the log. */
if (result.logFile != "") {
{

View file

@ -1,4 +1,3 @@
#include <iostream>
#include <algorithm>
#include <thread>
@ -54,6 +53,19 @@ void State::dispatcher()
system_time State::doDispatch()
{
/* Prune old historical build step info from the jobsets. */
{
auto jobsets_(jobsets.lock());
for (auto & jobset : *jobsets_) {
auto s1 = jobset.second->shareUsed();
jobset.second->pruneSteps();
auto s2 = jobset.second->shareUsed();
if (s1 != s2)
printMsg(lvlDebug, format("pruned scheduling window of %1%:%2% from %3% to %4%")
% jobset.first.first % jobset.first.second % s1 % s2);
}
}
/* Start steps until we're out of steps or slots. */
auto sleepUntil = system_time::max();
bool keepGoing;
@ -139,6 +151,13 @@ system_time State::doDispatch()
}
}
for (auto & step : runnableSorted) {
auto step_(step->state.lock());
step_->lowestShareUsed = 1e9;
for (auto & jobset : step_->jobsets)
step_->lowestShareUsed = std::min(step_->lowestShareUsed, jobset->shareUsed());
}
sort(runnableSorted.begin(), runnableSorted.end(),
[](const Step::ptr & a, const Step::ptr & b)
{
@ -146,6 +165,7 @@ system_time State::doDispatch()
auto b_(b->state.lock()); // FIXME: deadlock?
return
a_->highestGlobalPriority != b_->highestGlobalPriority ? a_->highestGlobalPriority > b_->highestGlobalPriority :
a_->lowestShareUsed != b_->lowestShareUsed ? a_->lowestShareUsed < b_->lowestShareUsed :
a_->lowestBuildID < b_->lowestBuildID;
});
@ -204,3 +224,24 @@ void State::wakeDispatcher()
}
dispatcherWakeupCV.notify_one();
}
void Jobset::addStep(time_t startTime, time_t duration)
{
auto steps_(steps.lock());
(*steps_)[startTime] = duration;
seconds += duration;
}
void Jobset::pruneSteps()
{
time_t now = time(0);
auto steps_(steps.lock());
while (!steps_->empty()) {
auto i = steps_->begin();
if (i->first > now - schedulingWindow) break;
seconds -= i->second;
steps_->erase(i);
}
}

View file

@ -462,6 +462,17 @@ void State::dumpStatus(Connection & conn, bool log)
}
}
}
{
root.attr("jobsets");
JSONObject nested(out);
auto jobsets_(jobsets.lock());
for (auto & jobset : *jobsets_) {
nested.attr(jobset.first.first + ":" + jobset.first.second);
JSONObject nested2(out);
nested2.attr("shareUsed"); out << jobset.second->shareUsed();
nested2.attr("seconds", jobset.second->getSeconds());
}
}
}
if (log) printMsg(lvlInfo, format("status: %1%") % out.str());

View file

@ -49,7 +49,6 @@ void State::queueMonitorLoop()
printMsg(lvlTalkative, "got notification: builds cancelled or bumped");
processQueueChange(*conn);
}
}
}
@ -84,6 +83,7 @@ void State::getQueuedBuilds(Connection & conn, std::shared_ptr<StoreAPI> store,
build->buildTimeout = row["timeout"].as<int>();
build->timestamp = row["timestamp"].as<time_t>();
build->globalPriority = row["globalPriority"].as<int>();
build->jobset = createJobset(txn, build->projectName, build->jobsetName);
newBuilds.emplace(std::make_pair(build->drvPath, build));
}
@ -210,6 +210,8 @@ void State::getQueuedBuilds(Connection & conn, std::shared_ptr<StoreAPI> store,
build->toplevel = step;
}
build->propagatePriorities();
printMsg(lvlChatty, format("added build %1% (top-level step %2%, %3% new steps)")
% build->id % step->drvPath % newSteps.size());
};
@ -230,8 +232,6 @@ void State::getQueuedBuilds(Connection & conn, std::shared_ptr<StoreAPI> store,
throw;
}
build->propagatePriorities();
/* Add the new runnable build steps to runnable and wake up
the builder threads. */
printMsg(lvlChatty, format("got %1% new runnable steps from %2% new builds") % newRunnable.size() % nrAdded);
@ -253,6 +253,7 @@ void Build::propagatePriorities()
auto step_(step->state.lock());
step_->highestGlobalPriority = std::max(step_->highestGlobalPriority, globalPriority);
step_->lowestBuildID = std::min(step_->lowestBuildID, id);
step_->jobsets.insert(jobset);
}, toplevel);
}
@ -395,3 +396,39 @@ Step::ptr State::createStep(std::shared_ptr<StoreAPI> store, const Path & drvPat
return step;
}
Jobset::ptr State::createJobset(pqxx::work & txn,
const std::string & projectName, const std::string & jobsetName)
{
auto jobsets_(jobsets.lock());
auto p = std::make_pair(projectName, jobsetName);
auto i = jobsets_->find(p);
if (i != jobsets_->end()) return i->second;
auto res = txn.parameterized
("select schedulingShares from Jobsets where project = $1 and name = $2")
(projectName)(jobsetName).exec();
if (res.empty()) throw Error("missing jobset - can't happen");
auto shares = res[0]["schedulingShares"].as<unsigned int>();
if (shares == 0) shares = 1;
auto jobset = std::make_shared<Jobset>(shares);
/* Load the build steps from the last 24 hours. */
res = txn.parameterized
("select s.startTime, s.stopTime from BuildSteps s join Builds b on build = id "
"where s.startTime is not null and s.stopTime > $1 and project = $2 and jobset = $3")
(time(0) - Jobset::schedulingWindow * 10)(projectName)(jobsetName).exec();
for (auto const & row : res) {
time_t startTime = row["startTime"].as<time_t>();
time_t stopTime = row["stopTime"].as<time_t>();
jobset->addStep(startTime, stopTime - startTime);
}
(*jobsets_)[p] = jobset;
return jobset;
}

View file

@ -60,6 +60,40 @@ struct Step;
struct BuildOutput;
class Jobset
{
public:
typedef std::shared_ptr<Jobset> ptr;
typedef std::weak_ptr<Jobset> wptr;
Jobset(unsigned int shares) : shares(shares) { }
static const time_t schedulingWindow = 24 * 60 * 60;
private:
std::atomic<time_t> seconds{0};
std::atomic<unsigned int> shares;
/* The start time and duration of the most recent build steps. */
Sync<std::map<time_t, time_t>> steps;
public:
double shareUsed()
{
return (double) seconds / shares;
}
time_t getSeconds() { return seconds; }
void addStep(time_t startTime, time_t duration);
void pruneSteps();
};
struct Build
{
typedef std::shared_ptr<Build> ptr;
@ -75,6 +109,8 @@ struct Build
std::shared_ptr<Step> toplevel;
Jobset::ptr jobset;
std::atomic_bool finishedInDB{false};
std::string fullJobName()
@ -110,6 +146,10 @@ struct Step
/* Builds that have this step as the top-level derivation. */
std::vector<Build::wptr> builds;
/* Jobsets to which this step belongs. Used for determining
scheduling priority. */
std::set<Jobset::ptr> jobsets;
/* Number of times we've tried this step. */
unsigned int tries = 0;
@ -120,6 +160,10 @@ struct Step
step. */
int highestGlobalPriority{0};
/* The lowest share used of any jobset depending on this
step. */
double lowestShareUsed;
/* The lowest ID of any build depending on this step. */
BuildID lowestBuildID{std::numeric_limits<BuildID>::max()};
};
@ -203,6 +247,10 @@ private:
typedef std::map<BuildID, Build::ptr> Builds;
Sync<Builds> builds;
/* The jobsets. */
typedef std::map<std::pair<std::string, std::string>, Jobset::ptr> Jobsets;
Sync<Jobsets> jobsets;
/* All active or pending build steps (i.e. dependencies of the
queued builds). Note that these are weak pointers. Steps are
kept alive by being reachable from Builds or by being in
@ -299,6 +347,9 @@ private:
Build::ptr referringBuild, Step::ptr referringStep, std::set<nix::Path> & finishedDrvs,
std::set<Step::ptr> & newSteps, std::set<Step::ptr> & newRunnable);
Jobset::ptr createJobset(pqxx::work & txn,
const std::string & projectName, const std::string & jobsetName);
void makeRunnable(Step::ptr step);
/* The thread that selects and starts runnable builds. */