diff --git a/src/hydra-queue-runner/builder.cc b/src/hydra-queue-runner/builder.cc index 56922719..2867f1f0 100644 --- a/src/hydra-queue-runner/builder.cc +++ b/src/hydra-queue-runner/builder.cc @@ -131,6 +131,16 @@ bool State::doBuildStep(std::shared_ptr store, Step::ptr step, time_t stepStopTime = time(0); if (!result.stopTime) result.stopTime = stepStopTime; + /* Account the time we spent building this step by dividing it + among the jobsets that depend on it. */ + { + auto step_(step->state.lock()); + // FIXME: loss of precision. + time_t charge = (result.stopTime - result.startTime) / step_->jobsets.size(); + for (auto & jobset : step_->jobsets) + jobset->addStep(result.startTime, charge); + } + /* Asynchronously compress the log. */ if (result.logFile != "") { { diff --git a/src/hydra-queue-runner/dispatcher.cc b/src/hydra-queue-runner/dispatcher.cc index fbe53848..e71c5677 100644 --- a/src/hydra-queue-runner/dispatcher.cc +++ b/src/hydra-queue-runner/dispatcher.cc @@ -1,4 +1,3 @@ -#include #include #include @@ -54,6 +53,19 @@ void State::dispatcher() system_time State::doDispatch() { + /* Prune old historical build step info from the jobsets. */ + { + auto jobsets_(jobsets.lock()); + for (auto & jobset : *jobsets_) { + auto s1 = jobset.second->shareUsed(); + jobset.second->pruneSteps(); + auto s2 = jobset.second->shareUsed(); + if (s1 != s2) + printMsg(lvlDebug, format("pruned scheduling window of ‘%1%:%2%’ from %3% to %4%") + % jobset.first.first % jobset.first.second % s1 % s2); + } + } + /* Start steps until we're out of steps or slots. */ auto sleepUntil = system_time::max(); bool keepGoing; @@ -139,6 +151,13 @@ system_time State::doDispatch() } } + for (auto & step : runnableSorted) { + auto step_(step->state.lock()); + step_->lowestShareUsed = 1e9; + for (auto & jobset : step_->jobsets) + step_->lowestShareUsed = std::min(step_->lowestShareUsed, jobset->shareUsed()); + } + sort(runnableSorted.begin(), runnableSorted.end(), [](const Step::ptr & a, const Step::ptr & b) { @@ -146,6 +165,7 @@ system_time State::doDispatch() auto b_(b->state.lock()); // FIXME: deadlock? return a_->highestGlobalPriority != b_->highestGlobalPriority ? a_->highestGlobalPriority > b_->highestGlobalPriority : + a_->lowestShareUsed != b_->lowestShareUsed ? a_->lowestShareUsed < b_->lowestShareUsed : a_->lowestBuildID < b_->lowestBuildID; }); @@ -204,3 +224,24 @@ void State::wakeDispatcher() } dispatcherWakeupCV.notify_one(); } + + +void Jobset::addStep(time_t startTime, time_t duration) +{ + auto steps_(steps.lock()); + (*steps_)[startTime] = duration; + seconds += duration; +} + + +void Jobset::pruneSteps() +{ + time_t now = time(0); + auto steps_(steps.lock()); + while (!steps_->empty()) { + auto i = steps_->begin(); + if (i->first > now - schedulingWindow) break; + seconds -= i->second; + steps_->erase(i); + } +} diff --git a/src/hydra-queue-runner/hydra-queue-runner.cc b/src/hydra-queue-runner/hydra-queue-runner.cc index 9a7bdcbc..3fe608fc 100644 --- a/src/hydra-queue-runner/hydra-queue-runner.cc +++ b/src/hydra-queue-runner/hydra-queue-runner.cc @@ -462,6 +462,17 @@ void State::dumpStatus(Connection & conn, bool log) } } } + { + root.attr("jobsets"); + JSONObject nested(out); + auto jobsets_(jobsets.lock()); + for (auto & jobset : *jobsets_) { + nested.attr(jobset.first.first + ":" + jobset.first.second); + JSONObject nested2(out); + nested2.attr("shareUsed"); out << jobset.second->shareUsed(); + nested2.attr("seconds", jobset.second->getSeconds()); + } + } } if (log) printMsg(lvlInfo, format("status: %1%") % out.str()); diff --git a/src/hydra-queue-runner/queue-monitor.cc b/src/hydra-queue-runner/queue-monitor.cc index b36d0875..49ba7ec6 100644 --- a/src/hydra-queue-runner/queue-monitor.cc +++ b/src/hydra-queue-runner/queue-monitor.cc @@ -49,7 +49,6 @@ void State::queueMonitorLoop() printMsg(lvlTalkative, "got notification: builds cancelled or bumped"); processQueueChange(*conn); } - } } @@ -84,6 +83,7 @@ void State::getQueuedBuilds(Connection & conn, std::shared_ptr store, build->buildTimeout = row["timeout"].as(); build->timestamp = row["timestamp"].as(); build->globalPriority = row["globalPriority"].as(); + build->jobset = createJobset(txn, build->projectName, build->jobsetName); newBuilds.emplace(std::make_pair(build->drvPath, build)); } @@ -210,6 +210,8 @@ void State::getQueuedBuilds(Connection & conn, std::shared_ptr store, build->toplevel = step; } + build->propagatePriorities(); + printMsg(lvlChatty, format("added build %1% (top-level step %2%, %3% new steps)") % build->id % step->drvPath % newSteps.size()); }; @@ -230,8 +232,6 @@ void State::getQueuedBuilds(Connection & conn, std::shared_ptr store, throw; } - build->propagatePriorities(); - /* Add the new runnable build steps to ‘runnable’ and wake up the builder threads. */ printMsg(lvlChatty, format("got %1% new runnable steps from %2% new builds") % newRunnable.size() % nrAdded); @@ -253,6 +253,7 @@ void Build::propagatePriorities() auto step_(step->state.lock()); step_->highestGlobalPriority = std::max(step_->highestGlobalPriority, globalPriority); step_->lowestBuildID = std::min(step_->lowestBuildID, id); + step_->jobsets.insert(jobset); }, toplevel); } @@ -395,3 +396,39 @@ Step::ptr State::createStep(std::shared_ptr store, const Path & drvPat return step; } + + +Jobset::ptr State::createJobset(pqxx::work & txn, + const std::string & projectName, const std::string & jobsetName) +{ + auto jobsets_(jobsets.lock()); + + auto p = std::make_pair(projectName, jobsetName); + + auto i = jobsets_->find(p); + if (i != jobsets_->end()) return i->second; + + auto res = txn.parameterized + ("select schedulingShares from Jobsets where project = $1 and name = $2") + (projectName)(jobsetName).exec(); + if (res.empty()) throw Error("missing jobset - can't happen"); + + auto shares = res[0]["schedulingShares"].as(); + if (shares == 0) shares = 1; + + auto jobset = std::make_shared(shares); + + /* Load the build steps from the last 24 hours. */ + res = txn.parameterized + ("select s.startTime, s.stopTime from BuildSteps s join Builds b on build = id " + "where s.startTime is not null and s.stopTime > $1 and project = $2 and jobset = $3") + (time(0) - Jobset::schedulingWindow * 10)(projectName)(jobsetName).exec(); + for (auto const & row : res) { + time_t startTime = row["startTime"].as(); + time_t stopTime = row["stopTime"].as(); + jobset->addStep(startTime, stopTime - startTime); + } + + (*jobsets_)[p] = jobset; + return jobset; +} diff --git a/src/hydra-queue-runner/state.hh b/src/hydra-queue-runner/state.hh index 746f1db5..aea7e566 100644 --- a/src/hydra-queue-runner/state.hh +++ b/src/hydra-queue-runner/state.hh @@ -60,6 +60,40 @@ struct Step; struct BuildOutput; +class Jobset +{ +public: + + typedef std::shared_ptr ptr; + typedef std::weak_ptr wptr; + + Jobset(unsigned int shares) : shares(shares) { } + + static const time_t schedulingWindow = 24 * 60 * 60; + +private: + + std::atomic seconds{0}; + std::atomic shares; + + /* The start time and duration of the most recent build steps. */ + Sync> steps; + +public: + + double shareUsed() + { + return (double) seconds / shares; + } + + time_t getSeconds() { return seconds; } + + void addStep(time_t startTime, time_t duration); + + void pruneSteps(); +}; + + struct Build { typedef std::shared_ptr ptr; @@ -75,6 +109,8 @@ struct Build std::shared_ptr toplevel; + Jobset::ptr jobset; + std::atomic_bool finishedInDB{false}; std::string fullJobName() @@ -110,6 +146,10 @@ struct Step /* Builds that have this step as the top-level derivation. */ std::vector builds; + /* Jobsets to which this step belongs. Used for determining + scheduling priority. */ + std::set jobsets; + /* Number of times we've tried this step. */ unsigned int tries = 0; @@ -120,6 +160,10 @@ struct Step step. */ int highestGlobalPriority{0}; + /* The lowest share used of any jobset depending on this + step. */ + double lowestShareUsed; + /* The lowest ID of any build depending on this step. */ BuildID lowestBuildID{std::numeric_limits::max()}; }; @@ -203,6 +247,10 @@ private: typedef std::map Builds; Sync builds; + /* The jobsets. */ + typedef std::map, Jobset::ptr> Jobsets; + Sync jobsets; + /* All active or pending build steps (i.e. dependencies of the queued builds). Note that these are weak pointers. Steps are kept alive by being reachable from Builds or by being in @@ -299,6 +347,9 @@ private: Build::ptr referringBuild, Step::ptr referringStep, std::set & finishedDrvs, std::set & newSteps, std::set & newRunnable); + Jobset::ptr createJobset(pqxx::work & txn, + const std::string & projectName, const std::string & jobsetName); + void makeRunnable(Step::ptr step); /* The thread that selects and starts runnable builds. */