diff --git a/src/hydra-queue-runner/queue-monitor.cc b/src/hydra-queue-runner/queue-monitor.cc index 49ba7ec6..b7c6ff8d 100644 --- a/src/hydra-queue-runner/queue-monitor.cc +++ b/src/hydra-queue-runner/queue-monitor.cc @@ -59,12 +59,17 @@ void State::getQueuedBuilds(Connection & conn, std::shared_ptr store, /* Grab the queued builds from the database, but don't process them yet (since we don't want a long-running transaction). */ - std::multimap newBuilds; + std::vector newIDs; + std::map newBuildsByID; + std::multimap newBuildsByPath; { pqxx::work txn(conn); - auto res = txn.parameterized("select id, project, jobset, job, drvPath, maxsilent, timeout, timestamp, globalPriority from Builds where id > $1 and finished = 0 order by id")(lastBuildId).exec(); + auto res = txn.parameterized + ("select id, project, jobset, job, drvPath, maxsilent, timeout, timestamp, globalPriority from Builds " + "where id > $1 and finished = 0 order by globalPriority desc, id") + (lastBuildId).exec(); for (auto const & row : res) { auto builds_(builds.lock()); @@ -85,7 +90,9 @@ void State::getQueuedBuilds(Connection & conn, std::shared_ptr store, build->globalPriority = row["globalPriority"].as(); build->jobset = createJobset(txn, build->projectName, build->jobsetName); - newBuilds.emplace(std::make_pair(build->drvPath, build)); + newIDs.push_back(id); + newBuildsByID[id] = build; + newBuildsByPath.emplace(std::make_pair(build->drvPath, id)); } } @@ -96,6 +103,7 @@ void State::getQueuedBuilds(Connection & conn, std::shared_ptr store, createBuild = [&](Build::ptr build) { printMsg(lvlTalkative, format("loading build %1% (%2%)") % build->id % build->fullJobName()); nrAdded++; + newBuildsByID.erase(build->id); if (!store->isValidPath(build->drvPath)) { /* Derivation has been GC'ed prematurely. */ @@ -124,13 +132,11 @@ void State::getQueuedBuilds(Connection & conn, std::shared_ptr store, if build A depends on build B with top-level step X, then X will be "accounted" to B in doBuildStep(). */ for (auto & r : newSteps) { - while (true) { - auto i = newBuilds.find(r->drvPath); - if (i == newBuilds.end()) break; - Build::ptr b = i->second; - newBuilds.erase(i); - createBuild(b); - } + auto i = newBuildsByPath.find(r->drvPath); + if (i == newBuildsByPath.end()) continue; + auto j = newBuildsByID.find(i->second); + if (j == newBuildsByID.end()) continue; + createBuild(j->second); } /* If we didn't get a step, it means the step's outputs are @@ -219,9 +225,10 @@ void State::getQueuedBuilds(Connection & conn, std::shared_ptr store, /* Now instantiate build steps for each new build. The builder threads can start building the runnable build steps right away, even while we're still processing other new builds. */ - while (!newBuilds.empty()) { - auto build = newBuilds.begin()->second; - newBuilds.erase(newBuilds.begin()); + for (auto id : newIDs) { + auto i = newBuildsByID.find(id); + if (i == newBuildsByID.end()) continue; + auto build = i->second; newRunnable.clear(); nrAdded = 0;