diff --git a/src/hydra-queue-runner/hydra-queue-runner.cc b/src/hydra-queue-runner/hydra-queue-runner.cc index 192e75a9..492b44a0 100644 --- a/src/hydra-queue-runner/hydra-queue-runner.cc +++ b/src/hydra-queue-runner/hydra-queue-runner.cc @@ -479,7 +479,7 @@ void State::getQueuedBuilds(Connection & conn, std::shared_ptr store, /* Grab the queued builds from the database, but don't process them yet (since we don't want a long-running transaction). */ - std::list newBuilds; // FIXME: use queue + std::multimap newBuilds; { pqxx::work txn(conn); @@ -498,20 +498,18 @@ void State::getQueuedBuilds(Connection & conn, std::shared_ptr store, build->fullJobName = row["project"].as() + ":" + row["jobset"].as() + ":" + row["job"].as(); build->maxSilentTime = row["maxsilent"].as(); build->buildTimeout = row["timeout"].as(); - std::cerr << build->id << " " << build->buildTimeout << std::endl; - newBuilds.push_back(build); + newBuilds.emplace(std::make_pair(build->drvPath, build)); } } - /* Now instantiate build steps for each new build. The builder - threads can start building the runnable build steps right away, - even while we're still processing other new builds. */ - for (auto & build : newBuilds) { - // FIXME: remove build from newBuilds to ensure quick destruction - // FIXME: exception handling + std::set newRunnable; + unsigned int nrAdded; + std::function createBuild; + createBuild = [&](Build::ptr build) { printMsg(lvlTalkative, format("loading build %1% (%2%)") % build->id % build->fullJobName); + nrAdded++; if (!store->isValidPath(build->drvPath)) { /* Derivation has been GC'ed prematurely. */ @@ -524,12 +522,26 @@ void State::getQueuedBuilds(Connection & conn, std::shared_ptr store, (time(0)) ("derivation was garbage-collected prior to build").exec(); txn.commit(); - continue; + return; } - std::set newSteps, newRunnable; + std::set newSteps; Step::ptr step = createStep(store, build->drvPath, newSteps, newRunnable); + /* Some of the new steps may be the top level of builds that + we haven't processed yet. So do them now. This ensures that + if build A depends on build B with top-level step X, then X + will be "accounted" to B in doBuildStep(). */ + for (auto & r : newSteps) { + while (true) { + auto i = newBuilds.find(r->drvPath); + if (i == newBuilds.end()) break; + Build::ptr b = i->second; + newBuilds.erase(i); + createBuild(b); + } + } + /* If we didn't get a step, it means the step's outputs are all valid. So we mark this as a finished, cached build. */ if (!step) { @@ -543,7 +555,7 @@ void State::getQueuedBuilds(Connection & conn, std::shared_ptr store, markSucceededBuild(txn, build, res, true, now, now); txn.commit(); - continue; + return; } /* If any step has an unsupported system type or has a @@ -591,7 +603,7 @@ void State::getQueuedBuilds(Connection & conn, std::shared_ptr store, } } - if (badStep) continue; + if (badStep) return; /* Note: if we exit this scope prior to this, the build and all newly created steps are destroyed. */ @@ -604,16 +616,30 @@ void State::getQueuedBuilds(Connection & conn, std::shared_ptr store, build->toplevel = step; } - printMsg(lvlChatty, format("added build %1% (top-level step %2%, %3% new steps, %4% new runnable steps)") - % build->id % step->drvPath % newSteps.size() % newRunnable.size()); + printMsg(lvlChatty, format("added build %1% (top-level step %2%, %3% new steps)") + % build->id % step->drvPath % newSteps.size()); /* Prior to this, the build is not visible to getDependentBuilds(). Now it is, so the build can be failed if a dependency fails. (It can't succeed right away because its top-level is not runnable yet). */ + }; + + /* Now instantiate build steps for each new build. The builder + threads can start building the runnable build steps right away, + even while we're still processing other new builds. */ + while (!newBuilds.empty()) { + auto build = newBuilds.begin()->second; + newBuilds.erase(newBuilds.begin()); + + newRunnable.clear(); + nrAdded = 0; + createBuild(build); + /* Add the new runnable build steps to ‘runnable’ and wake up the builder threads. */ + printMsg(lvlChatty, format("got %1% new runnable steps from %2% new builds") % newRunnable.size() % nrAdded); for (auto & r : newRunnable) makeRunnable(r); } @@ -1235,8 +1261,6 @@ void State::run() auto queueMonitorThread = std::thread(&State::queueMonitor, this); - sleep(5); - std::thread(&State::dispatcher, this).detach(); queueMonitorThread.join();