From 7e026d35f7a7f66151a56ed72aa4436ee9b345c9 Mon Sep 17 00:00:00 2001 From: Eelco Dolstra Date: Tue, 21 Jul 2015 15:14:17 +0200 Subject: [PATCH] Split hydra-queue-runner.cc more --- src/hydra-queue-runner/Makefile.am | 3 +- src/hydra-queue-runner/builder.cc | 378 ++++++++ src/hydra-queue-runner/dispatcher.cc | 155 ++++ src/hydra-queue-runner/hydra-queue-runner.cc | 904 +------------------ src/hydra-queue-runner/queue-monitor.cc | 369 ++++++++ src/hydra-queue-runner/state.hh | 16 + 6 files changed, 921 insertions(+), 904 deletions(-) create mode 100644 src/hydra-queue-runner/builder.cc create mode 100644 src/hydra-queue-runner/dispatcher.cc create mode 100644 src/hydra-queue-runner/queue-monitor.cc diff --git a/src/hydra-queue-runner/Makefile.am b/src/hydra-queue-runner/Makefile.am index f947ddc4..089187d1 100644 --- a/src/hydra-queue-runner/Makefile.am +++ b/src/hydra-queue-runner/Makefile.am @@ -1,6 +1,7 @@ bin_PROGRAMS = hydra-queue-runner -hydra_queue_runner_SOURCES = hydra-queue-runner.cc build-result.cc build-remote.cc \ +hydra_queue_runner_SOURCES = hydra-queue-runner.cc queue-monitor.cc dispatcher.cc \ + builder.cc build-result.cc build-remote.cc \ build-result.hh counter.hh pool.hh sync.hh token-server.hh state.hh db.hh hydra_queue_runner_LDADD = $(NIX_LIBS) -lpqxx diff --git a/src/hydra-queue-runner/builder.cc b/src/hydra-queue-runner/builder.cc new file mode 100644 index 00000000..540a61d6 --- /dev/null +++ b/src/hydra-queue-runner/builder.cc @@ -0,0 +1,378 @@ +#include + +#include "state.hh" +#include "build-result.hh" + +using namespace nix; + + +void State::builder(Step::ptr step, Machine::ptr machine, std::shared_ptr reservation) +{ + bool retry = true; + + MaintainCount mc(nrActiveSteps); + + try { + auto store = openStore(); // FIXME: pool + retry = doBuildStep(store, step, machine); + } catch (std::exception & e) { + printMsg(lvlError, format("uncaught exception building ‘%1%’ on ‘%2%’: %3%") + % step->drvPath % machine->sshName % e.what()); + } + + /* Release the machine and wake up the dispatcher. */ + assert(reservation.unique()); + reservation = 0; + wakeDispatcher(); + + /* If there was a temporary failure, retry the step after an + exponentially increasing interval. */ + if (retry) { + { + auto step_(step->state.lock()); + step_->tries++; + nrRetries++; + if (step_->tries > maxNrRetries) maxNrRetries = step_->tries; // yeah yeah, not atomic + int delta = retryInterval * powf(retryBackoff, step_->tries - 1); + printMsg(lvlInfo, format("will retry ‘%1%’ after %2%s") % step->drvPath % delta); + step_->after = std::chrono::system_clock::now() + std::chrono::seconds(delta); + } + + makeRunnable(step); + } +} + + +bool State::doBuildStep(std::shared_ptr store, Step::ptr step, + Machine::ptr machine) +{ + { + auto step_(step->state.lock()); + assert(step_->created); + assert(!step->finished); + } + + /* There can be any number of builds in the database that depend + on this derivation. Arbitrarily pick one (though preferring a + build of which this is the top-level derivation) for the + purpose of creating build steps. We could create a build step + record for every build, but that could be very expensive + (e.g. a stdenv derivation can be a dependency of tens of + thousands of builds), so we don't. */ + Build::ptr build; + + { + std::set dependents; + std::set steps; + getDependents(step, dependents, steps); + + if (dependents.empty()) { + /* Apparently all builds that depend on this derivation + are gone (e.g. cancelled). So don't bother. This is + very unlikely to happen, because normally Steps are + only kept alive by being reachable from a + Build. However, it's possible that a new Build just + created a reference to this step. So to handle that + possibility, we retry this step (putting it back in + the runnable queue). If there are really no strong + pointers to the step, it will be deleted. */ + printMsg(lvlInfo, format("maybe cancelling build step ‘%1%’") % step->drvPath); + return true; + } + + for (auto build2 : dependents) + if (build2->drvPath == step->drvPath) { build = build2; break; } + + if (!build) build = *dependents.begin(); + + printMsg(lvlInfo, format("performing step ‘%1%’ on ‘%2%’ (needed by build %3% and %4% others)") + % step->drvPath % machine->sshName % build->id % (dependents.size() - 1)); + } + + bool quit = build->id == buildOne; + + auto conn(dbPool.get()); + + RemoteResult result; + BuildOutput res; + int stepNr = 0; + + time_t stepStartTime = result.startTime = time(0); + + /* If any of the outputs have previously failed, then don't bother + building again. */ + bool cachedFailure = checkCachedFailure(step, *conn); + + if (cachedFailure) + result.status = BuildResult::CachedFailure; + else { + + /* Create a build step record indicating that we started + building. Also, mark the selected build as busy. */ + { + pqxx::work txn(*conn); + stepNr = createBuildStep(txn, result.startTime, build, step, machine->sshName, bssBusy); + txn.parameterized("update Builds set busy = 1 where id = $1")(build->id).exec(); + txn.commit(); + } + + /* Do the build. */ + try { + /* FIXME: referring builds may have conflicting timeouts. */ + buildRemote(store, machine, step, build->maxSilentTime, build->buildTimeout, result); + } catch (Error & e) { + result.status = BuildResult::MiscFailure; + result.errorMsg = e.msg(); + } + + if (result.success()) res = getBuildOutput(store, step->drv); + } + + time_t stepStopTime = time(0); + if (!result.stopTime) result.stopTime = stepStopTime; + + /* Asynchronously compress the log. */ + if (result.logFile != "") { + { + auto logCompressorQueue_(logCompressorQueue.lock()); + logCompressorQueue_->push(result.logFile); + } + logCompressorWakeup.notify_one(); + } + + /* The step had a hopefully temporary failure (e.g. network + issue). Retry a number of times. */ + if (result.canRetry()) { + printMsg(lvlError, format("possibly transient failure building ‘%1%’ on ‘%2%’: %3%") + % step->drvPath % machine->sshName % result.errorMsg); + bool retry; + { + auto step_(step->state.lock()); + retry = step_->tries + 1 < maxTries; + } + if (retry) { + pqxx::work txn(*conn); + finishBuildStep(txn, result.startTime, result.stopTime, build->id, + stepNr, machine->sshName, bssAborted, result.errorMsg); + txn.commit(); + if (quit) exit(1); + return true; + } + } + + if (result.success()) { + + /* Register success in the database for all Build objects that + have this step as the top-level step. Since the queue + monitor thread may be creating new referring Builds + concurrently, and updating the database may fail, we do + this in a loop, marking all known builds, repeating until + there are no unmarked builds. + */ + + std::vector buildIDs; + + while (true) { + + /* Get the builds that have this one as the top-level. */ + std::vector direct; + { + auto steps_(steps.lock()); + auto step_(step->state.lock()); + + for (auto & b_ : step_->builds) { + auto b = b_.lock(); + if (b && !b->finishedInDB) direct.push_back(b); + } + + /* If there are no builds left to update in the DB, + then we're done (except for calling + finishBuildStep()). Delete the step from + ‘steps’. Since we've been holding the ‘steps’ lock, + no new referrers can have been added in the + meantime or be added afterwards. */ + if (direct.empty()) { + printMsg(lvlDebug, format("finishing build step ‘%1%’") % step->drvPath); + steps_->erase(step->drvPath); + } + } + + /* Update the database. */ + { + pqxx::work txn(*conn); + + finishBuildStep(txn, result.startTime, result.stopTime, build->id, stepNr, machine->sshName, bssSuccess); + + for (auto & b : direct) + markSucceededBuild(txn, b, res, build != b || result.status != BuildResult::Built, + result.startTime, result.stopTime); + + txn.commit(); + } + + if (direct.empty()) break; + + /* Remove the direct dependencies from ‘builds’. This will + cause them to be destroyed. */ + for (auto & b : direct) { + auto builds_(builds.lock()); + b->finishedInDB = true; + builds_->erase(b->id); + buildIDs.push_back(b->id); + } + } + + /* Send notification about the builds that have this step as + the top-level. */ + for (auto id : buildIDs) { + { + auto notificationSenderQueue_(notificationSenderQueue.lock()); + notificationSenderQueue_->push(NotificationItem(id, std::vector())); + } + notificationSenderWakeup.notify_one(); + } + + /* Wake up any dependent steps that have no other + dependencies. */ + { + auto step_(step->state.lock()); + for (auto & rdepWeak : step_->rdeps) { + auto rdep = rdepWeak.lock(); + if (!rdep) continue; + + bool runnable = false; + { + auto rdep_(rdep->state.lock()); + rdep_->deps.erase(step); + /* Note: if the step has not finished + initialisation yet, it will be made runnable in + createStep(), if appropriate. */ + if (rdep_->deps.empty() && rdep_->created) runnable = true; + } + + if (runnable) makeRunnable(rdep); + } + } + + } else { + + /* Register failure in the database for all Build objects that + directly or indirectly depend on this step. */ + + std::vector dependentIDs; + + while (true) { + + /* Get the builds and steps that depend on this step. */ + std::set indirect; + { + auto steps_(steps.lock()); + std::set steps; + getDependents(step, indirect, steps); + + /* If there are no builds left, delete all referring + steps from ‘steps’. As for the success case, we can + be certain no new referrers can be added. */ + if (indirect.empty()) { + for (auto & s : steps) { + printMsg(lvlDebug, format("finishing build step ‘%1%’") % s->drvPath); + steps_->erase(s->drvPath); + } + break; + } + } + + /* Update the database. */ + { + pqxx::work txn(*conn); + + BuildStatus buildStatus = + result.status == BuildResult::TimedOut ? bsTimedOut : + result.canRetry() ? bsAborted : + bsFailed; + BuildStepStatus buildStepStatus = + result.status == BuildResult::TimedOut ? bssTimedOut : + result.canRetry() ? bssAborted : + bssFailed; + + /* For standard failures, we don't care about the error + message. */ + if (result.status == BuildResult::PermanentFailure || + result.status == BuildResult::TransientFailure || + result.status == BuildResult::CachedFailure || + result.status == BuildResult::TimedOut) + result.errorMsg = ""; + + /* Create failed build steps for every build that depends + on this. For cached failures, only create a step for + builds that don't have this step as top-level + (otherwise the user won't be able to see what caused + the build to fail). */ + for (auto & build2 : indirect) { + if ((cachedFailure && build2->drvPath == step->drvPath) || + (!cachedFailure && build == build2) || + build2->finishedInDB) + continue; + createBuildStep(txn, 0, build2, step, machine->sshName, + buildStepStatus, result.errorMsg, build == build2 ? 0 : build->id); + } + + if (!cachedFailure) + finishBuildStep(txn, result.startTime, result.stopTime, build->id, + stepNr, machine->sshName, buildStepStatus, result.errorMsg); + + /* Mark all builds that depend on this derivation as failed. */ + for (auto & build2 : indirect) { + if (build2->finishedInDB) continue; + printMsg(lvlError, format("marking build %1% as failed") % build2->id); + txn.parameterized + ("update Builds set finished = 1, busy = 0, buildStatus = $2, startTime = $3, stopTime = $4, isCachedBuild = $5 where id = $1 and finished = 0") + (build2->id) + ((int) (build2->drvPath != step->drvPath && buildStatus == bsFailed ? bsDepFailed : buildStatus)) + (result.startTime) + (result.stopTime) + (cachedFailure ? 1 : 0).exec(); + nrBuildsDone++; + } + + /* Remember failed paths in the database so that they + won't be built again. */ + if (!cachedFailure && result.status == BuildResult::PermanentFailure) + for (auto & path : outputPaths(step->drv)) + txn.parameterized("insert into FailedPaths values ($1)")(path).exec(); + + txn.commit(); + } + + /* Remove the indirect dependencies from ‘builds’. This + will cause them to be destroyed. */ + for (auto & b : indirect) { + auto builds_(builds.lock()); + b->finishedInDB = true; + builds_->erase(b->id); + dependentIDs.push_back(b->id); + if (buildOne == b->id) quit = true; + } + } + + /* Send notification about this build and its dependents. */ + { + auto notificationSenderQueue_(notificationSenderQueue.lock()); + notificationSenderQueue_->push(NotificationItem(build->id, dependentIDs)); + } + notificationSenderWakeup.notify_one(); + + } + + // FIXME: keep stats about aborted steps? + nrStepsDone++; + totalStepTime += stepStopTime - stepStartTime; + totalStepBuildTime += result.stopTime - result.startTime; + machine->state->nrStepsDone++; + machine->state->totalStepTime += stepStopTime - stepStartTime; + machine->state->totalStepBuildTime += result.stopTime - result.startTime; + + if (quit) exit(0); // testing hack + + return false; +} diff --git a/src/hydra-queue-runner/dispatcher.cc b/src/hydra-queue-runner/dispatcher.cc new file mode 100644 index 00000000..bd4db86d --- /dev/null +++ b/src/hydra-queue-runner/dispatcher.cc @@ -0,0 +1,155 @@ +#include +#include + +#include "state.hh" + +using namespace nix; + + +void State::makeRunnable(Step::ptr step) +{ + printMsg(lvlChatty, format("step ‘%1%’ is now runnable") % step->drvPath); + + { + auto step_(step->state.lock()); + assert(step_->created); + assert(!step->finished); + assert(step_->deps.empty()); + } + + { + auto runnable_(runnable.lock()); + runnable_->push_back(step); + } + + wakeDispatcher(); +} + + +void State::dispatcher() +{ + while (true) { + printMsg(lvlDebug, "dispatcher woken up"); + + auto sleepUntil = system_time::max(); + + bool keepGoing; + + do { + /* Copy the currentJobs field of each machine. This is + necessary to ensure that the sort comparator below is + an ordering. std::sort() can segfault if it isn't. */ + struct MachineInfo + { + Machine::ptr machine; + unsigned int currentJobs; + }; + std::vector machinesSorted; + { + auto machines_(machines.lock()); + for (auto & m : *machines_) + machinesSorted.push_back({m.second, m.second->state->currentJobs}); + } + + /* Sort the machines by a combination of speed factor and + available slots. Prioritise the available machines as + follows: + + - First by load divided by speed factor, rounded to the + nearest integer. This causes fast machines to be + preferred over slow machines with similar loads. + + - Then by speed factor. + + - Finally by load. */ + sort(machinesSorted.begin(), machinesSorted.end(), + [](const MachineInfo & a, const MachineInfo & b) -> bool + { + float ta = roundf(a.currentJobs / a.machine->speedFactor); + float tb = roundf(b.currentJobs / b.machine->speedFactor); + return + ta != tb ? ta < tb : + a.machine->speedFactor != b.machine->speedFactor ? a.machine->speedFactor > b.machine->speedFactor : + a.currentJobs > b.currentJobs; + }); + + /* Find a machine with a free slot and find a step to run + on it. Once we find such a pair, we restart the outer + loop because the machine sorting will have changed. */ + keepGoing = false; + system_time now = std::chrono::system_clock::now(); + + for (auto & mi : machinesSorted) { + // FIXME: can we lose a wakeup if a builder exits concurrently? + if (mi.machine->state->currentJobs >= mi.machine->maxJobs) continue; + + auto runnable_(runnable.lock()); + //printMsg(lvlDebug, format("%1% runnable builds") % runnable_->size()); + + /* FIXME: we're holding the runnable lock too long + here. This could be more efficient. */ + + for (auto i = runnable_->begin(); i != runnable_->end(); ) { + auto step = i->lock(); + + /* Delete dead steps. */ + if (!step) { + i = runnable_->erase(i); + continue; + } + + /* Can this machine do this step? */ + if (!mi.machine->supportsStep(step)) { + ++i; + continue; + } + + /* Skip previously failed steps that aren't ready + to be retried. */ + { + auto step_(step->state.lock()); + if (step_->tries > 0 && step_->after > now) { + if (step_->after < sleepUntil) + sleepUntil = step_->after; + ++i; + continue; + } + } + + /* Make a slot reservation and start a thread to + do the build. */ + auto reservation = std::make_shared(mi.machine->state->currentJobs); + i = runnable_->erase(i); + + auto builderThread = std::thread(&State::builder, this, step, mi.machine, reservation); + builderThread.detach(); // FIXME? + + keepGoing = true; + break; + } + + if (keepGoing) break; + } + + } while (keepGoing); + + /* Sleep until we're woken up (either because a runnable build + is added, or because a build finishes). */ + { + std::unique_lock lock(dispatcherMutex); + printMsg(lvlDebug, format("dispatcher sleeping for %1%s") % + std::chrono::duration_cast(sleepUntil - std::chrono::system_clock::now()).count()); + dispatcherWakeup.wait_until(lock, sleepUntil); + nrDispatcherWakeups++; + } + } + + printMsg(lvlError, "dispatcher exits"); +} + + +void State::wakeDispatcher() +{ + { std::lock_guard lock(dispatcherMutex); } // barrier + dispatcherWakeup.notify_one(); +} diff --git a/src/hydra-queue-runner/hydra-queue-runner.cc b/src/hydra-queue-runner/hydra-queue-runner.cc index 21b1f65b..2860b854 100644 --- a/src/hydra-queue-runner/hydra-queue-runner.cc +++ b/src/hydra-queue-runner/hydra-queue-runner.cc @@ -1,14 +1,12 @@ #include #include -#include -#include #include #include #include -#include "build-result.hh" #include "state.hh" +#include "build-result.hh" #include "shared.hh" #include "globals.hh" @@ -17,20 +15,6 @@ using namespace nix; -// FIXME: Make configurable. -const unsigned int maxTries = 5; -const unsigned int retryInterval = 60; // seconds -const float retryBackoff = 3.0; -const unsigned int maxParallelCopyClosure = 4; - - -template -bool has(const C & c, const V & v) -{ - return c.find(v) != c.end(); -} - - State::State() { hydraData = getEnv("HYDRA_DATA"); @@ -186,371 +170,6 @@ void State::finishBuildStep(pqxx::work & txn, time_t startTime, time_t stopTime, } -void State::queueMonitor() -{ - while (true) { - try { - queueMonitorLoop(); - } catch (std::exception & e) { - printMsg(lvlError, format("queue monitor: %1%") % e.what()); - sleep(10); // probably a DB problem, so don't retry right away - } - } -} - - -void State::queueMonitorLoop() -{ - auto conn(dbPool.get()); - - receiver buildsAdded(*conn, "builds_added"); - receiver buildsRestarted(*conn, "builds_restarted"); - receiver buildsCancelled(*conn, "builds_cancelled"); - receiver buildsDeleted(*conn, "builds_deleted"); - - auto store = openStore(); // FIXME: pool - - unsigned int lastBuildId = 0; - - while (true) { - getQueuedBuilds(*conn, store, lastBuildId); - - /* Sleep until we get notification from the database about an - event. */ - conn->await_notification(); - nrQueueWakeups++; - - if (buildsAdded.get()) - printMsg(lvlTalkative, "got notification: new builds added to the queue"); - if (buildsRestarted.get()) { - printMsg(lvlTalkative, "got notification: builds restarted"); - lastBuildId = 0; // check all builds - } - if (buildsCancelled.get() || buildsDeleted.get()) { - printMsg(lvlTalkative, "got notification: builds cancelled"); - removeCancelledBuilds(*conn); - } - - } -} - - -void State::getQueuedBuilds(Connection & conn, std::shared_ptr store, unsigned int & lastBuildId) -{ - printMsg(lvlInfo, format("checking the queue for builds > %1%...") % lastBuildId); - - /* Grab the queued builds from the database, but don't process - them yet (since we don't want a long-running transaction). */ - std::multimap newBuilds; - - { - pqxx::work txn(conn); - - auto res = txn.parameterized("select id, project, jobset, job, drvPath, maxsilent, timeout from Builds where id > $1 and finished = 0 order by id")(lastBuildId).exec(); - - for (auto const & row : res) { - auto builds_(builds.lock()); - BuildID id = row["id"].as(); - if (buildOne && id != buildOne) continue; - if (id > lastBuildId) lastBuildId = id; - if (has(*builds_, id)) continue; - - auto build = std::make_shared(); - build->id = id; - build->drvPath = row["drvPath"].as(); - build->fullJobName = row["project"].as() + ":" + row["jobset"].as() + ":" + row["job"].as(); - build->maxSilentTime = row["maxsilent"].as(); - build->buildTimeout = row["timeout"].as(); - - newBuilds.emplace(std::make_pair(build->drvPath, build)); - } - } - - std::set newRunnable; - unsigned int nrAdded; - std::function createBuild; - - createBuild = [&](Build::ptr build) { - printMsg(lvlTalkative, format("loading build %1% (%2%)") % build->id % build->fullJobName); - nrAdded++; - - if (!store->isValidPath(build->drvPath)) { - /* Derivation has been GC'ed prematurely. */ - printMsg(lvlError, format("aborting GC'ed build %1%") % build->id); - if (!build->finishedInDB) { - pqxx::work txn(conn); - txn.parameterized - ("update Builds set finished = 1, busy = 0, buildStatus = $2, startTime = $3, stopTime = $3, errorMsg = $4 where id = $1 and finished = 0") - (build->id) - ((int) bsAborted) - (time(0)) - ("derivation was garbage-collected prior to build").exec(); - txn.commit(); - build->finishedInDB = true; - nrBuildsDone++; - } - return; - } - - std::set newSteps; - std::set finishedDrvs; // FIXME: re-use? - Step::ptr step = createStep(store, build->drvPath, build, 0, finishedDrvs, newSteps, newRunnable); - - /* Some of the new steps may be the top level of builds that - we haven't processed yet. So do them now. This ensures that - if build A depends on build B with top-level step X, then X - will be "accounted" to B in doBuildStep(). */ - for (auto & r : newSteps) { - while (true) { - auto i = newBuilds.find(r->drvPath); - if (i == newBuilds.end()) break; - Build::ptr b = i->second; - newBuilds.erase(i); - createBuild(b); - } - } - - /* If we didn't get a step, it means the step's outputs are - all valid. So we mark this as a finished, cached build. */ - if (!step) { - Derivation drv = readDerivation(build->drvPath); - BuildOutput res = getBuildOutput(store, drv); - - pqxx::work txn(conn); - time_t now = time(0); - markSucceededBuild(txn, build, res, true, now, now); - txn.commit(); - - build->finishedInDB = true; - - return; - } - - /* If any step has an unsupported system type or has a - previously failed output path, then fail the build right - away. */ - bool badStep = false; - for (auto & r : newSteps) { - BuildStatus buildStatus = bsSuccess; - BuildStepStatus buildStepStatus = bssFailed; - - if (checkCachedFailure(r, conn)) { - printMsg(lvlError, format("marking build %1% as cached failure") % build->id); - buildStatus = step == r ? bsFailed : bsDepFailed; - buildStepStatus = bssFailed; - } - - if (buildStatus == bsSuccess) { - bool supported = false; - { - auto machines_(machines.lock()); // FIXME: use shared_mutex - for (auto & m : *machines_) - if (m.second->supportsStep(r)) { supported = true; break; } - } - - if (!supported) { - printMsg(lvlError, format("aborting unsupported build %1%") % build->id); - buildStatus = bsUnsupported; - buildStepStatus = bssUnsupported; - } - } - - if (buildStatus != bsSuccess) { - time_t now = time(0); - if (!build->finishedInDB) { - pqxx::work txn(conn); - createBuildStep(txn, 0, build, r, "", buildStepStatus); - txn.parameterized - ("update Builds set finished = 1, busy = 0, buildStatus = $2, startTime = $3, stopTime = $3, isCachedBuild = $4 where id = $1 and finished = 0") - (build->id) - ((int) buildStatus) - (now) - (buildStatus != bsUnsupported ? 1 : 0).exec(); - txn.commit(); - build->finishedInDB = true; - nrBuildsDone++; - } - badStep = true; - break; - } - } - - if (badStep) return; - - /* Note: if we exit this scope prior to this, the build and - all newly created steps are destroyed. */ - - { - auto builds_(builds.lock()); - if (!build->finishedInDB) // FIXME: can this happen? - (*builds_)[build->id] = build; - build->toplevel = step; - } - - printMsg(lvlChatty, format("added build %1% (top-level step %2%, %3% new steps)") - % build->id % step->drvPath % newSteps.size()); - }; - - /* Now instantiate build steps for each new build. The builder - threads can start building the runnable build steps right away, - even while we're still processing other new builds. */ - while (!newBuilds.empty()) { - auto build = newBuilds.begin()->second; - newBuilds.erase(newBuilds.begin()); - - newRunnable.clear(); - nrAdded = 0; - try { - createBuild(build); - } catch (Error & e) { - e.addPrefix(format("while loading build %1%: ") % build->id); - throw; - } - - /* Add the new runnable build steps to ‘runnable’ and wake up - the builder threads. */ - printMsg(lvlChatty, format("got %1% new runnable steps from %2% new builds") % newRunnable.size() % nrAdded); - for (auto & r : newRunnable) - makeRunnable(r); - - nrBuildsRead += nrAdded; - } -} - - -void State::removeCancelledBuilds(Connection & conn) -{ - /* Get the current set of queued builds. */ - std::set currentIds; - { - pqxx::work txn(conn); - auto res = txn.exec("select id from Builds where finished = 0"); - for (auto const & row : res) - currentIds.insert(row["id"].as()); - } - - auto builds_(builds.lock()); - - for (auto i = builds_->begin(); i != builds_->end(); ) { - if (currentIds.find(i->first) == currentIds.end()) { - printMsg(lvlInfo, format("discarding cancelled build %1%") % i->first); - i = builds_->erase(i); - // FIXME: ideally we would interrupt active build steps here. - } else - ++i; - } -} - - -Step::ptr State::createStep(std::shared_ptr store, const Path & drvPath, - Build::ptr referringBuild, Step::ptr referringStep, std::set & finishedDrvs, - std::set & newSteps, std::set & newRunnable) -{ - if (finishedDrvs.find(drvPath) != finishedDrvs.end()) return 0; - - /* Check if the requested step already exists. If not, create a - new step. In any case, make the step reachable from - referringBuild or referringStep. This is done atomically (with - ‘steps’ locked), to ensure that this step can never become - reachable from a new build after doBuildStep has removed it - from ‘steps’. */ - Step::ptr step; - bool isNew = false; - { - auto steps_(steps.lock()); - - /* See if the step already exists in ‘steps’ and is not - stale. */ - auto prev = steps_->find(drvPath); - if (prev != steps_->end()) { - step = prev->second.lock(); - /* Since ‘step’ is a strong pointer, the referred Step - object won't be deleted after this. */ - if (!step) steps_->erase(drvPath); // remove stale entry - } - - /* If it doesn't exist, create it. */ - if (!step) { - step = std::make_shared(); - step->drvPath = drvPath; - isNew = true; - } - - auto step_(step->state.lock()); - - assert(step_->created != isNew); - - if (referringBuild) - step_->builds.push_back(referringBuild); - - if (referringStep) - step_->rdeps.push_back(referringStep); - - (*steps_)[drvPath] = step; - } - - if (!isNew) return step; - - printMsg(lvlDebug, format("considering derivation ‘%1%’") % drvPath); - - /* Initialize the step. Note that the step may be visible in - ‘steps’ before this point, but that doesn't matter because - it's not runnable yet, and other threads won't make it - runnable while step->created == false. */ - step->drv = readDerivation(drvPath); - { - auto i = step->drv.env.find("requiredSystemFeatures"); - if (i != step->drv.env.end()) - step->requiredSystemFeatures = tokenizeString>(i->second); - } - - auto attr = step->drv.env.find("preferLocalBuild"); - step->preferLocalBuild = - attr != step->drv.env.end() && attr->second == "1" - && has(localPlatforms, step->drv.platform); - - /* Are all outputs valid? */ - bool valid = true; - for (auto & i : step->drv.outputs) { - if (!store->isValidPath(i.second.path)) { - valid = false; - break; - } - } - - // FIXME: check whether all outputs are in the binary cache. - if (valid) { - finishedDrvs.insert(drvPath); - return 0; - } - - /* No, we need to build. */ - printMsg(lvlDebug, format("creating build step ‘%1%’") % drvPath); - newSteps.insert(step); - - /* Create steps for the dependencies. */ - for (auto & i : step->drv.inputDrvs) { - auto dep = createStep(store, i.first, 0, step, finishedDrvs, newSteps, newRunnable); - if (dep) { - auto step_(step->state.lock()); - step_->deps.insert(dep); - } - } - - /* If the step has no (remaining) dependencies, make it - runnable. */ - { - auto step_(step->state.lock()); - assert(!step_->created); - step_->created = true; - if (step_->deps.empty()) - newRunnable.insert(step); - } - - return step; -} - - /* Get the steps and unfinished builds that depend on the given step. */ void getDependents(Step::ptr step, std::set & builds, std::set & steps) { @@ -585,527 +204,6 @@ void getDependents(Step::ptr step, std::set & builds, std::setdrvPath); - - { - auto step_(step->state.lock()); - assert(step_->created); - assert(!step->finished); - assert(step_->deps.empty()); - } - - { - auto runnable_(runnable.lock()); - runnable_->push_back(step); - } - - wakeDispatcher(); -} - - -void State::dispatcher() -{ - while (true) { - printMsg(lvlDebug, "dispatcher woken up"); - - auto sleepUntil = system_time::max(); - - bool keepGoing; - - do { - /* Copy the currentJobs field of each machine. This is - necessary to ensure that the sort comparator below is - an ordering. std::sort() can segfault if it isn't. */ - struct MachineInfo - { - Machine::ptr machine; - unsigned int currentJobs; - }; - std::vector machinesSorted; - { - auto machines_(machines.lock()); - for (auto & m : *machines_) - machinesSorted.push_back({m.second, m.second->state->currentJobs}); - } - - /* Sort the machines by a combination of speed factor and - available slots. Prioritise the available machines as - follows: - - - First by load divided by speed factor, rounded to the - nearest integer. This causes fast machines to be - preferred over slow machines with similar loads. - - - Then by speed factor. - - - Finally by load. */ - sort(machinesSorted.begin(), machinesSorted.end(), - [](const MachineInfo & a, const MachineInfo & b) -> bool - { - float ta = roundf(a.currentJobs / a.machine->speedFactor); - float tb = roundf(b.currentJobs / b.machine->speedFactor); - return - ta != tb ? ta < tb : - a.machine->speedFactor != b.machine->speedFactor ? a.machine->speedFactor > b.machine->speedFactor : - a.currentJobs > b.currentJobs; - }); - - /* Find a machine with a free slot and find a step to run - on it. Once we find such a pair, we restart the outer - loop because the machine sorting will have changed. */ - keepGoing = false; - system_time now = std::chrono::system_clock::now(); - - for (auto & mi : machinesSorted) { - // FIXME: can we lose a wakeup if a builder exits concurrently? - if (mi.machine->state->currentJobs >= mi.machine->maxJobs) continue; - - auto runnable_(runnable.lock()); - //printMsg(lvlDebug, format("%1% runnable builds") % runnable_->size()); - - /* FIXME: we're holding the runnable lock too long - here. This could be more efficient. */ - - for (auto i = runnable_->begin(); i != runnable_->end(); ) { - auto step = i->lock(); - - /* Delete dead steps. */ - if (!step) { - i = runnable_->erase(i); - continue; - } - - /* Can this machine do this step? */ - if (!mi.machine->supportsStep(step)) { - ++i; - continue; - } - - /* Skip previously failed steps that aren't ready - to be retried. */ - { - auto step_(step->state.lock()); - if (step_->tries > 0 && step_->after > now) { - if (step_->after < sleepUntil) - sleepUntil = step_->after; - ++i; - continue; - } - } - - /* Make a slot reservation and start a thread to - do the build. */ - auto reservation = std::make_shared(mi.machine->state->currentJobs); - i = runnable_->erase(i); - - auto builderThread = std::thread(&State::builder, this, step, mi.machine, reservation); - builderThread.detach(); // FIXME? - - keepGoing = true; - break; - } - - if (keepGoing) break; - } - - } while (keepGoing); - - /* Sleep until we're woken up (either because a runnable build - is added, or because a build finishes). */ - { - std::unique_lock lock(dispatcherMutex); - printMsg(lvlDebug, format("dispatcher sleeping for %1%s") % - std::chrono::duration_cast(sleepUntil - std::chrono::system_clock::now()).count()); - dispatcherWakeup.wait_until(lock, sleepUntil); - nrDispatcherWakeups++; - } - } - - printMsg(lvlError, "dispatcher exits"); -} - - -void State::wakeDispatcher() -{ - { std::lock_guard lock(dispatcherMutex); } // barrier - dispatcherWakeup.notify_one(); -} - - -void State::builder(Step::ptr step, Machine::ptr machine, std::shared_ptr reservation) -{ - bool retry = true; - - MaintainCount mc(nrActiveSteps); - - try { - auto store = openStore(); // FIXME: pool - retry = doBuildStep(store, step, machine); - } catch (std::exception & e) { - printMsg(lvlError, format("uncaught exception building ‘%1%’ on ‘%2%’: %3%") - % step->drvPath % machine->sshName % e.what()); - } - - /* Release the machine and wake up the dispatcher. */ - assert(reservation.unique()); - reservation = 0; - wakeDispatcher(); - - /* If there was a temporary failure, retry the step after an - exponentially increasing interval. */ - if (retry) { - { - auto step_(step->state.lock()); - step_->tries++; - nrRetries++; - if (step_->tries > maxNrRetries) maxNrRetries = step_->tries; // yeah yeah, not atomic - int delta = retryInterval * powf(retryBackoff, step_->tries - 1); - printMsg(lvlInfo, format("will retry ‘%1%’ after %2%s") % step->drvPath % delta); - step_->after = std::chrono::system_clock::now() + std::chrono::seconds(delta); - } - - makeRunnable(step); - } -} - - -bool State::doBuildStep(std::shared_ptr store, Step::ptr step, - Machine::ptr machine) -{ - { - auto step_(step->state.lock()); - assert(step_->created); - assert(!step->finished); - } - - /* There can be any number of builds in the database that depend - on this derivation. Arbitrarily pick one (though preferring a - build of which this is the top-level derivation) for the - purpose of creating build steps. We could create a build step - record for every build, but that could be very expensive - (e.g. a stdenv derivation can be a dependency of tens of - thousands of builds), so we don't. */ - Build::ptr build; - - { - std::set dependents; - std::set steps; - getDependents(step, dependents, steps); - - if (dependents.empty()) { - /* Apparently all builds that depend on this derivation - are gone (e.g. cancelled). So don't bother. This is - very unlikely to happen, because normally Steps are - only kept alive by being reachable from a - Build. However, it's possible that a new Build just - created a reference to this step. So to handle that - possibility, we retry this step (putting it back in - the runnable queue). If there are really no strong - pointers to the step, it will be deleted. */ - printMsg(lvlInfo, format("maybe cancelling build step ‘%1%’") % step->drvPath); - return true; - } - - for (auto build2 : dependents) - if (build2->drvPath == step->drvPath) { build = build2; break; } - - if (!build) build = *dependents.begin(); - - printMsg(lvlInfo, format("performing step ‘%1%’ on ‘%2%’ (needed by build %3% and %4% others)") - % step->drvPath % machine->sshName % build->id % (dependents.size() - 1)); - } - - bool quit = build->id == buildOne; - - auto conn(dbPool.get()); - - RemoteResult result; - BuildOutput res; - int stepNr = 0; - - time_t stepStartTime = result.startTime = time(0); - - /* If any of the outputs have previously failed, then don't bother - building again. */ - bool cachedFailure = checkCachedFailure(step, *conn); - - if (cachedFailure) - result.status = BuildResult::CachedFailure; - else { - - /* Create a build step record indicating that we started - building. Also, mark the selected build as busy. */ - { - pqxx::work txn(*conn); - stepNr = createBuildStep(txn, result.startTime, build, step, machine->sshName, bssBusy); - txn.parameterized("update Builds set busy = 1 where id = $1")(build->id).exec(); - txn.commit(); - } - - /* Do the build. */ - try { - /* FIXME: referring builds may have conflicting timeouts. */ - buildRemote(store, machine, step, build->maxSilentTime, build->buildTimeout, result); - } catch (Error & e) { - result.status = BuildResult::MiscFailure; - result.errorMsg = e.msg(); - } - - if (result.success()) res = getBuildOutput(store, step->drv); - } - - time_t stepStopTime = time(0); - if (!result.stopTime) result.stopTime = stepStopTime; - - /* Asynchronously compress the log. */ - if (result.logFile != "") { - { - auto logCompressorQueue_(logCompressorQueue.lock()); - logCompressorQueue_->push(result.logFile); - } - logCompressorWakeup.notify_one(); - } - - /* The step had a hopefully temporary failure (e.g. network - issue). Retry a number of times. */ - if (result.canRetry()) { - printMsg(lvlError, format("possibly transient failure building ‘%1%’ on ‘%2%’: %3%") - % step->drvPath % machine->sshName % result.errorMsg); - bool retry; - { - auto step_(step->state.lock()); - retry = step_->tries + 1 < maxTries; - } - if (retry) { - pqxx::work txn(*conn); - finishBuildStep(txn, result.startTime, result.stopTime, build->id, - stepNr, machine->sshName, bssAborted, result.errorMsg); - txn.commit(); - if (quit) exit(1); - return true; - } - } - - if (result.success()) { - - /* Register success in the database for all Build objects that - have this step as the top-level step. Since the queue - monitor thread may be creating new referring Builds - concurrently, and updating the database may fail, we do - this in a loop, marking all known builds, repeating until - there are no unmarked builds. - */ - - std::vector buildIDs; - - while (true) { - - /* Get the builds that have this one as the top-level. */ - std::vector direct; - { - auto steps_(steps.lock()); - auto step_(step->state.lock()); - - for (auto & b_ : step_->builds) { - auto b = b_.lock(); - if (b && !b->finishedInDB) direct.push_back(b); - } - - /* If there are no builds left to update in the DB, - then we're done (except for calling - finishBuildStep()). Delete the step from - ‘steps’. Since we've been holding the ‘steps’ lock, - no new referrers can have been added in the - meantime or be added afterwards. */ - if (direct.empty()) { - printMsg(lvlDebug, format("finishing build step ‘%1%’") % step->drvPath); - steps_->erase(step->drvPath); - } - } - - /* Update the database. */ - { - pqxx::work txn(*conn); - - finishBuildStep(txn, result.startTime, result.stopTime, build->id, stepNr, machine->sshName, bssSuccess); - - for (auto & b : direct) - markSucceededBuild(txn, b, res, build != b || result.status != BuildResult::Built, - result.startTime, result.stopTime); - - txn.commit(); - } - - if (direct.empty()) break; - - /* Remove the direct dependencies from ‘builds’. This will - cause them to be destroyed. */ - for (auto & b : direct) { - auto builds_(builds.lock()); - b->finishedInDB = true; - builds_->erase(b->id); - buildIDs.push_back(b->id); - } - } - - /* Send notification about the builds that have this step as - the top-level. */ - for (auto id : buildIDs) { - { - auto notificationSenderQueue_(notificationSenderQueue.lock()); - notificationSenderQueue_->push(NotificationItem(id, std::vector())); - } - notificationSenderWakeup.notify_one(); - } - - /* Wake up any dependent steps that have no other - dependencies. */ - { - auto step_(step->state.lock()); - for (auto & rdepWeak : step_->rdeps) { - auto rdep = rdepWeak.lock(); - if (!rdep) continue; - - bool runnable = false; - { - auto rdep_(rdep->state.lock()); - rdep_->deps.erase(step); - /* Note: if the step has not finished - initialisation yet, it will be made runnable in - createStep(), if appropriate. */ - if (rdep_->deps.empty() && rdep_->created) runnable = true; - } - - if (runnable) makeRunnable(rdep); - } - } - - } else { - - /* Register failure in the database for all Build objects that - directly or indirectly depend on this step. */ - - std::vector dependentIDs; - - while (true) { - - /* Get the builds and steps that depend on this step. */ - std::set indirect; - { - auto steps_(steps.lock()); - std::set steps; - getDependents(step, indirect, steps); - - /* If there are no builds left, delete all referring - steps from ‘steps’. As for the success case, we can - be certain no new referrers can be added. */ - if (indirect.empty()) { - for (auto & s : steps) { - printMsg(lvlDebug, format("finishing build step ‘%1%’") % s->drvPath); - steps_->erase(s->drvPath); - } - break; - } - } - - /* Update the database. */ - { - pqxx::work txn(*conn); - - BuildStatus buildStatus = - result.status == BuildResult::TimedOut ? bsTimedOut : - result.canRetry() ? bsAborted : - bsFailed; - BuildStepStatus buildStepStatus = - result.status == BuildResult::TimedOut ? bssTimedOut : - result.canRetry() ? bssAborted : - bssFailed; - - /* For standard failures, we don't care about the error - message. */ - if (result.status == BuildResult::PermanentFailure || - result.status == BuildResult::TransientFailure || - result.status == BuildResult::CachedFailure || - result.status == BuildResult::TimedOut) - result.errorMsg = ""; - - /* Create failed build steps for every build that depends - on this. For cached failures, only create a step for - builds that don't have this step as top-level - (otherwise the user won't be able to see what caused - the build to fail). */ - for (auto & build2 : indirect) { - if ((cachedFailure && build2->drvPath == step->drvPath) || - (!cachedFailure && build == build2) || - build2->finishedInDB) - continue; - createBuildStep(txn, 0, build2, step, machine->sshName, - buildStepStatus, result.errorMsg, build == build2 ? 0 : build->id); - } - - if (!cachedFailure) - finishBuildStep(txn, result.startTime, result.stopTime, build->id, - stepNr, machine->sshName, buildStepStatus, result.errorMsg); - - /* Mark all builds that depend on this derivation as failed. */ - for (auto & build2 : indirect) { - if (build2->finishedInDB) continue; - printMsg(lvlError, format("marking build %1% as failed") % build2->id); - txn.parameterized - ("update Builds set finished = 1, busy = 0, buildStatus = $2, startTime = $3, stopTime = $4, isCachedBuild = $5 where id = $1 and finished = 0") - (build2->id) - ((int) (build2->drvPath != step->drvPath && buildStatus == bsFailed ? bsDepFailed : buildStatus)) - (result.startTime) - (result.stopTime) - (cachedFailure ? 1 : 0).exec(); - nrBuildsDone++; - } - - /* Remember failed paths in the database so that they - won't be built again. */ - if (!cachedFailure && result.status == BuildResult::PermanentFailure) - for (auto & path : outputPaths(step->drv)) - txn.parameterized("insert into FailedPaths values ($1)")(path).exec(); - - txn.commit(); - } - - /* Remove the indirect dependencies from ‘builds’. This - will cause them to be destroyed. */ - for (auto & b : indirect) { - auto builds_(builds.lock()); - b->finishedInDB = true; - builds_->erase(b->id); - dependentIDs.push_back(b->id); - if (buildOne == b->id) quit = true; - } - } - - /* Send notification about this build and its dependents. */ - { - auto notificationSenderQueue_(notificationSenderQueue.lock()); - notificationSenderQueue_->push(NotificationItem(build->id, dependentIDs)); - } - notificationSenderWakeup.notify_one(); - - } - - // FIXME: keep stats about aborted steps? - nrStepsDone++; - totalStepTime += stepStopTime - stepStartTime; - totalStepBuildTime += result.stopTime - result.startTime; - machine->state->nrStepsDone++; - machine->state->totalStepTime += stepStopTime - stepStartTime; - machine->state->totalStepBuildTime += result.stopTime - result.startTime; - - if (quit) exit(0); // testing hack - - return false; -} - - void State::markSucceededBuild(pqxx::work & txn, Build::ptr build, const BuildOutput & res, bool isCachedBuild, time_t startTime, time_t stopTime) { diff --git a/src/hydra-queue-runner/queue-monitor.cc b/src/hydra-queue-runner/queue-monitor.cc new file mode 100644 index 00000000..bd9b2aef --- /dev/null +++ b/src/hydra-queue-runner/queue-monitor.cc @@ -0,0 +1,369 @@ +#include "state.hh" +#include "build-result.hh" + +using namespace nix; + + +void State::queueMonitor() +{ + while (true) { + try { + queueMonitorLoop(); + } catch (std::exception & e) { + printMsg(lvlError, format("queue monitor: %1%") % e.what()); + sleep(10); // probably a DB problem, so don't retry right away + } + } +} + + +void State::queueMonitorLoop() +{ + auto conn(dbPool.get()); + + receiver buildsAdded(*conn, "builds_added"); + receiver buildsRestarted(*conn, "builds_restarted"); + receiver buildsCancelled(*conn, "builds_cancelled"); + receiver buildsDeleted(*conn, "builds_deleted"); + + auto store = openStore(); // FIXME: pool + + unsigned int lastBuildId = 0; + + while (true) { + getQueuedBuilds(*conn, store, lastBuildId); + + /* Sleep until we get notification from the database about an + event. */ + conn->await_notification(); + nrQueueWakeups++; + + if (buildsAdded.get()) + printMsg(lvlTalkative, "got notification: new builds added to the queue"); + if (buildsRestarted.get()) { + printMsg(lvlTalkative, "got notification: builds restarted"); + lastBuildId = 0; // check all builds + } + if (buildsCancelled.get() || buildsDeleted.get()) { + printMsg(lvlTalkative, "got notification: builds cancelled"); + removeCancelledBuilds(*conn); + } + + } +} + + +void State::getQueuedBuilds(Connection & conn, std::shared_ptr store, unsigned int & lastBuildId) +{ + printMsg(lvlInfo, format("checking the queue for builds > %1%...") % lastBuildId); + + /* Grab the queued builds from the database, but don't process + them yet (since we don't want a long-running transaction). */ + std::multimap newBuilds; + + { + pqxx::work txn(conn); + + auto res = txn.parameterized("select id, project, jobset, job, drvPath, maxsilent, timeout from Builds where id > $1 and finished = 0 order by id")(lastBuildId).exec(); + + for (auto const & row : res) { + auto builds_(builds.lock()); + BuildID id = row["id"].as(); + if (buildOne && id != buildOne) continue; + if (id > lastBuildId) lastBuildId = id; + if (has(*builds_, id)) continue; + + auto build = std::make_shared(); + build->id = id; + build->drvPath = row["drvPath"].as(); + build->fullJobName = row["project"].as() + ":" + row["jobset"].as() + ":" + row["job"].as(); + build->maxSilentTime = row["maxsilent"].as(); + build->buildTimeout = row["timeout"].as(); + + newBuilds.emplace(std::make_pair(build->drvPath, build)); + } + } + + std::set newRunnable; + unsigned int nrAdded; + std::function createBuild; + + createBuild = [&](Build::ptr build) { + printMsg(lvlTalkative, format("loading build %1% (%2%)") % build->id % build->fullJobName); + nrAdded++; + + if (!store->isValidPath(build->drvPath)) { + /* Derivation has been GC'ed prematurely. */ + printMsg(lvlError, format("aborting GC'ed build %1%") % build->id); + if (!build->finishedInDB) { + pqxx::work txn(conn); + txn.parameterized + ("update Builds set finished = 1, busy = 0, buildStatus = $2, startTime = $3, stopTime = $3, errorMsg = $4 where id = $1 and finished = 0") + (build->id) + ((int) bsAborted) + (time(0)) + ("derivation was garbage-collected prior to build").exec(); + txn.commit(); + build->finishedInDB = true; + nrBuildsDone++; + } + return; + } + + std::set newSteps; + std::set finishedDrvs; // FIXME: re-use? + Step::ptr step = createStep(store, build->drvPath, build, 0, finishedDrvs, newSteps, newRunnable); + + /* Some of the new steps may be the top level of builds that + we haven't processed yet. So do them now. This ensures that + if build A depends on build B with top-level step X, then X + will be "accounted" to B in doBuildStep(). */ + for (auto & r : newSteps) { + while (true) { + auto i = newBuilds.find(r->drvPath); + if (i == newBuilds.end()) break; + Build::ptr b = i->second; + newBuilds.erase(i); + createBuild(b); + } + } + + /* If we didn't get a step, it means the step's outputs are + all valid. So we mark this as a finished, cached build. */ + if (!step) { + Derivation drv = readDerivation(build->drvPath); + BuildOutput res = getBuildOutput(store, drv); + + pqxx::work txn(conn); + time_t now = time(0); + markSucceededBuild(txn, build, res, true, now, now); + txn.commit(); + + build->finishedInDB = true; + + return; + } + + /* If any step has an unsupported system type or has a + previously failed output path, then fail the build right + away. */ + bool badStep = false; + for (auto & r : newSteps) { + BuildStatus buildStatus = bsSuccess; + BuildStepStatus buildStepStatus = bssFailed; + + if (checkCachedFailure(r, conn)) { + printMsg(lvlError, format("marking build %1% as cached failure") % build->id); + buildStatus = step == r ? bsFailed : bsDepFailed; + buildStepStatus = bssFailed; + } + + if (buildStatus == bsSuccess) { + bool supported = false; + { + auto machines_(machines.lock()); // FIXME: use shared_mutex + for (auto & m : *machines_) + if (m.second->supportsStep(r)) { supported = true; break; } + } + + if (!supported) { + printMsg(lvlError, format("aborting unsupported build %1%") % build->id); + buildStatus = bsUnsupported; + buildStepStatus = bssUnsupported; + } + } + + if (buildStatus != bsSuccess) { + time_t now = time(0); + if (!build->finishedInDB) { + pqxx::work txn(conn); + createBuildStep(txn, 0, build, r, "", buildStepStatus); + txn.parameterized + ("update Builds set finished = 1, busy = 0, buildStatus = $2, startTime = $3, stopTime = $3, isCachedBuild = $4 where id = $1 and finished = 0") + (build->id) + ((int) buildStatus) + (now) + (buildStatus != bsUnsupported ? 1 : 0).exec(); + txn.commit(); + build->finishedInDB = true; + nrBuildsDone++; + } + badStep = true; + break; + } + } + + if (badStep) return; + + /* Note: if we exit this scope prior to this, the build and + all newly created steps are destroyed. */ + + { + auto builds_(builds.lock()); + if (!build->finishedInDB) // FIXME: can this happen? + (*builds_)[build->id] = build; + build->toplevel = step; + } + + printMsg(lvlChatty, format("added build %1% (top-level step %2%, %3% new steps)") + % build->id % step->drvPath % newSteps.size()); + }; + + /* Now instantiate build steps for each new build. The builder + threads can start building the runnable build steps right away, + even while we're still processing other new builds. */ + while (!newBuilds.empty()) { + auto build = newBuilds.begin()->second; + newBuilds.erase(newBuilds.begin()); + + newRunnable.clear(); + nrAdded = 0; + try { + createBuild(build); + } catch (Error & e) { + e.addPrefix(format("while loading build %1%: ") % build->id); + throw; + } + + /* Add the new runnable build steps to ‘runnable’ and wake up + the builder threads. */ + printMsg(lvlChatty, format("got %1% new runnable steps from %2% new builds") % newRunnable.size() % nrAdded); + for (auto & r : newRunnable) + makeRunnable(r); + + nrBuildsRead += nrAdded; + } +} + + +void State::removeCancelledBuilds(Connection & conn) +{ + /* Get the current set of queued builds. */ + std::set currentIds; + { + pqxx::work txn(conn); + auto res = txn.exec("select id from Builds where finished = 0"); + for (auto const & row : res) + currentIds.insert(row["id"].as()); + } + + auto builds_(builds.lock()); + + for (auto i = builds_->begin(); i != builds_->end(); ) { + if (currentIds.find(i->first) == currentIds.end()) { + printMsg(lvlInfo, format("discarding cancelled build %1%") % i->first); + i = builds_->erase(i); + // FIXME: ideally we would interrupt active build steps here. + } else + ++i; + } +} + + +Step::ptr State::createStep(std::shared_ptr store, const Path & drvPath, + Build::ptr referringBuild, Step::ptr referringStep, std::set & finishedDrvs, + std::set & newSteps, std::set & newRunnable) +{ + if (finishedDrvs.find(drvPath) != finishedDrvs.end()) return 0; + + /* Check if the requested step already exists. If not, create a + new step. In any case, make the step reachable from + referringBuild or referringStep. This is done atomically (with + ‘steps’ locked), to ensure that this step can never become + reachable from a new build after doBuildStep has removed it + from ‘steps’. */ + Step::ptr step; + bool isNew = false; + { + auto steps_(steps.lock()); + + /* See if the step already exists in ‘steps’ and is not + stale. */ + auto prev = steps_->find(drvPath); + if (prev != steps_->end()) { + step = prev->second.lock(); + /* Since ‘step’ is a strong pointer, the referred Step + object won't be deleted after this. */ + if (!step) steps_->erase(drvPath); // remove stale entry + } + + /* If it doesn't exist, create it. */ + if (!step) { + step = std::make_shared(); + step->drvPath = drvPath; + isNew = true; + } + + auto step_(step->state.lock()); + + assert(step_->created != isNew); + + if (referringBuild) + step_->builds.push_back(referringBuild); + + if (referringStep) + step_->rdeps.push_back(referringStep); + + (*steps_)[drvPath] = step; + } + + if (!isNew) return step; + + printMsg(lvlDebug, format("considering derivation ‘%1%’") % drvPath); + + /* Initialize the step. Note that the step may be visible in + ‘steps’ before this point, but that doesn't matter because + it's not runnable yet, and other threads won't make it + runnable while step->created == false. */ + step->drv = readDerivation(drvPath); + { + auto i = step->drv.env.find("requiredSystemFeatures"); + if (i != step->drv.env.end()) + step->requiredSystemFeatures = tokenizeString>(i->second); + } + + auto attr = step->drv.env.find("preferLocalBuild"); + step->preferLocalBuild = + attr != step->drv.env.end() && attr->second == "1" + && has(localPlatforms, step->drv.platform); + + /* Are all outputs valid? */ + bool valid = true; + for (auto & i : step->drv.outputs) { + if (!store->isValidPath(i.second.path)) { + valid = false; + break; + } + } + + // FIXME: check whether all outputs are in the binary cache. + if (valid) { + finishedDrvs.insert(drvPath); + return 0; + } + + /* No, we need to build. */ + printMsg(lvlDebug, format("creating build step ‘%1%’") % drvPath); + newSteps.insert(step); + + /* Create steps for the dependencies. */ + for (auto & i : step->drv.inputDrvs) { + auto dep = createStep(store, i.first, 0, step, finishedDrvs, newSteps, newRunnable); + if (dep) { + auto step_(step->state.lock()); + step_->deps.insert(dep); + } + } + + /* If the step has no (remaining) dependencies, make it + runnable. */ + { + auto step_(step->state.lock()); + assert(!step_->created); + step_->created = true; + if (step_->deps.empty()) + newRunnable.insert(step); + } + + return step; +} diff --git a/src/hydra-queue-runner/state.hh b/src/hydra-queue-runner/state.hh index 1c1a91b4..fa352c6e 100644 --- a/src/hydra-queue-runner/state.hh +++ b/src/hydra-queue-runner/state.hh @@ -118,6 +118,9 @@ struct Step }; +void getDependents(Step::ptr step, std::set & builds, std::set & steps); + + struct Machine { typedef std::shared_ptr ptr; @@ -159,6 +162,12 @@ class State { private: + // FIXME: Make configurable. + const unsigned int maxTries = 5; + const unsigned int retryInterval = 60; // seconds + const float retryBackoff = 3.0; + const unsigned int maxParallelCopyClosure = 4; + nix::Path hydraData, logDir; nix::StringSet localPlatforms; @@ -306,3 +315,10 @@ public: void run(BuildID buildOne = 0); }; + + +template +bool has(const C & c, const V & v) +{ + return c.find(v) != c.end(); +}