Make concurrency more robust

This commit is contained in:
Eelco Dolstra 2015-05-29 17:14:20 +02:00
parent 8640e30787
commit e778821940
2 changed files with 321 additions and 98 deletions

View file

@ -8,6 +8,8 @@
#include <pqxx/pqxx>
#include "build-result.hh"
#include "sync.hh"
#include "store-api.hh"
#include "derivations.hh"
#include "shared.hh"
@ -16,6 +18,13 @@
using namespace nix;
template <class C, class V>
bool has(const C & c, const V & v)
{
return c.find(v) != c.end();
}
std::mutex exitRequestMutex;
std::condition_variable exitRequest;
bool exitRequested(false);
@ -76,6 +85,9 @@ struct Connection : pqxx::connection
typedef unsigned int BuildID;
struct Step;
struct Build
{
typedef std::shared_ptr<Build> ptr;
@ -84,10 +96,18 @@ struct Build
BuildID id;
Path drvPath;
std::map<string, Path> outputs;
std::string fullJobName;
std::shared_ptr<Step> toplevel;
bool finishedInDB;
Build() : finishedInDB(false) { }
~Build()
{
printMsg(lvlError, format("destroying build %1%") % id);
}
};
@ -95,9 +115,12 @@ struct Step
{
typedef std::shared_ptr<Step> ptr;
typedef std::weak_ptr<Step> wptr;
Path drvPath;
Derivation drv;
struct State
{
/* The build steps on which this step depends. */
std::set<Step::ptr> deps;
@ -106,6 +129,14 @@ struct Step
/* Builds that have this step as the top-level derivation. */
std::vector<Build::wptr> builds;
};
Sync<State> state;
~Step()
{
printMsg(lvlError, format("destroying step %1%") % drvPath);
}
};
@ -116,17 +147,21 @@ private:
std::thread queueMonitorThread;
/* The queued builds. */
std::map<BuildID, Build::ptr> builds;
typedef std::map<BuildID, Build::ptr> Builds;
Sync<Builds> builds;
/* All active or pending build steps (i.e. dependencies of the
queued builds). */
std::map<Path, Step::ptr> steps;
queued builds). Note that these are weak pointers. Steps are
kept alive by being reachable from Builds or by being in
progress. */
typedef std::map<Path, Step::wptr> Steps;
Sync<Steps> steps;
/* Build steps that have no unbuilt dependencies. */
std::set<Step::ptr> runnable;
typedef std::list<Step::wptr> Runnable;
Sync<Runnable> runnable;
std::mutex runnableMutex;
std::condition_variable runnableCV;
std::condition_variable_any runnableCV;
public:
State();
@ -147,7 +182,8 @@ public:
void getQueuedBuilds(std::shared_ptr<StoreAPI> store, pqxx::connection & conn);
Step::ptr createStep(std::shared_ptr<StoreAPI> store, const Path & drvPath);
Step::ptr createStep(std::shared_ptr<StoreAPI> store, const Path & drvPath,
std::set<Step::ptr> & newRunnable);
void destroyStep(Step::ptr step, bool proceed);
@ -254,26 +290,55 @@ void State::getQueuedBuilds(std::shared_ptr<StoreAPI> store, pqxx::connection &
{
printMsg(lvlError, "checking the queue...");
#if 0
{
auto runnable_(runnable.lock());
auto builds_(builds.lock());
auto steps_(steps.lock());
printMsg(lvlError, format("%1% builds, %2% steps, %3% runnable steps")
% builds_->size()
% steps_->size()
% runnable_->size());
}
#endif
/* Grab the queued builds from the database, but don't process
them yet (since we don't want a long-running transaction). */
std::list<Build::ptr> newBuilds; // FIXME: use queue
{
pqxx::work txn(conn);
// FIXME: query only builds with ID higher than the previous
// highest.
auto res = txn.exec("select * from Builds where finished = 0");
auto res = txn.exec("select * from Builds where finished = 0 order by id");
auto builds_(builds.lock());
// FIXME: don't process inside a txn.
for (auto const & row : res) {
BuildID id = row["id"].as<BuildID>();
if (builds.find(id) != builds.end()) continue;
if (has(*builds_, id)) continue;
Build::ptr build(new Build);
auto build = std::make_shared<Build>();
build->id = id;
build->drvPath = row["drvPath"].as<string>();
build->fullJobName = row["project"].as<string>() + ":" + row["jobset"].as<string>() + ":" + row["job"].as<string>();
printMsg(lvlInfo, format("loading build %1% (%2%:%3%:%4%)") % id % row["project"] % row["jobset"] % row["job"]);
newBuilds.push_back(build);
}
}
/* Now instantiate build steps for each new build. The builder
threads can start building the runnable build steps right away,
even while we're still processing other new builds. */
for (auto & build : newBuilds) {
// FIXME: remove build from newBuilds to ensure quick destruction
// FIXME: exception handling
printMsg(lvlInfo, format("loading build %1% (%2%)") % build->id % build->fullJobName);
if (!store->isValidPath(build->drvPath)) {
/* Derivation has been GC'ed prematurely. */
Connection conn;
pqxx::work txn(conn);
txn.parameterized
("update Builds set finished = 1, buildStatus = $2, startTime = $3, stopTime = $3, errorMsg = $4 where id = $1")
@ -285,12 +350,15 @@ void State::getQueuedBuilds(std::shared_ptr<StoreAPI> store, pqxx::connection &
continue;
}
Step::ptr step = createStep(store, build->drvPath);
std::set<Step::ptr> newRunnable;
Step::ptr step = createStep(store, build->drvPath, newRunnable);
/* If we didn't get a step, it means the step's outputs are
all valid. So we mark this as a finished, cached build. */
if (!step) {
Derivation drv = readDerivation(build->drvPath);
BuildResult res = getBuildResult(store, drv);
Connection conn;
pqxx::work txn(conn);
time_t now = time(0);
markSucceededBuild(txn, build, res, true, now, now);
@ -299,21 +367,49 @@ void State::getQueuedBuilds(std::shared_ptr<StoreAPI> store, pqxx::connection &
continue;
}
step->builds.push_back(build);
/* Note: if we exit this scope prior to this, the build and
all newly created steps are destroyed. */
builds[id] = build;
{
auto builds_(builds.lock());
auto step_(step->state.lock());
(*builds_)[build->id] = build;
step_->builds.push_back(build);
build->toplevel = step;
}
/* Prior to this, the build is not visible to
getDependentBuilds(). Now it is, so the build can be
failed if a dependency fails. (It can't succeed right away
because its top-level is not runnable yet). */
/* Add the new runnable build steps to runnable and wake up
the builder threads. */
for (auto & r : newRunnable)
makeRunnable(r);
}
}
Step::ptr State::createStep(std::shared_ptr<StoreAPI> store, const Path & drvPath)
Step::ptr State::createStep(std::shared_ptr<StoreAPI> store, const Path & drvPath,
std::set<Step::ptr> & newRunnable)
{
auto prev = steps.find(drvPath);
if (prev != steps.end()) return prev->second;
/* Check if the requested step already exists. */
{
auto steps_(steps.lock());
auto prev = steps_->find(drvPath);
if (prev != steps_->end()) {
auto step = prev->second.lock();
/* Since step is a strong pointer, the referred Step
object won't be deleted after this. */
if (step) return step;
steps_->erase(drvPath); // remove stale entry
}
}
printMsg(lvlInfo, format("considering derivation %1%") % drvPath);
Step::ptr step(new Step);
auto step = std::make_shared<Step>();
step->drvPath = drvPath;
step->drv = readDerivation(drvPath);
@ -333,17 +429,25 @@ Step::ptr State::createStep(std::shared_ptr<StoreAPI> store, const Path & drvPat
printMsg(lvlInfo, format("creating build step %1%") % drvPath);
/* Create steps for the dependencies. */
bool hasDeps = false;
for (auto & i : step->drv.inputDrvs) {
Step::ptr dep = createStep(store, i.first);
Step::ptr dep = createStep(store, i.first, newRunnable);
if (dep) {
step->deps.insert(dep);
dep->rdeps.push_back(step);
hasDeps = true;
auto step_(step->state.lock());
auto dep_(dep->state.lock());
step_->deps.insert(dep);
dep_->rdeps.push_back(step);
}
}
steps[drvPath] = step;
{
auto steps_(steps.lock());
assert(steps_->find(drvPath) == steps_->end());
(*steps_)[drvPath] = step;
}
if (step->deps.empty()) makeRunnable(step);
if (!hasDeps) newRunnable.insert(step);
return step;
}
@ -351,30 +455,48 @@ Step::ptr State::createStep(std::shared_ptr<StoreAPI> store, const Path & drvPat
void State::destroyStep(Step::ptr step, bool proceed)
{
steps.erase(step->drvPath);
printMsg(lvlInfo, format("destroying build step %1%") % step->drvPath);
for (auto & rdep_ : step->rdeps) {
auto rdep = rdep_.lock();
if (!rdep) continue;
assert(rdep->deps.find(step) != rdep->deps.end());
rdep->deps.erase(step);
if (proceed) {
/* If this rdep has no other dependencies, then we can now
build it. */
if (rdep->deps.empty())
makeRunnable(rdep);
} else
/* If step failed, then delete all dependent steps as
well. */
destroyStep(rdep, false);
{
auto steps_(steps.lock());
steps_->erase(step->drvPath);
}
for (auto & build_ : step->builds) {
std::vector<Step::wptr> rdeps;
{
auto step_(step->state.lock());
rdeps = step_->rdeps;
/* Sanity checks. */
for (auto & build_ : step_->builds) {
auto build = build_.lock();
if (!build) continue;
assert(build->drvPath == step->drvPath);
assert(build->finishedInDB);
}
}
for (auto & rdep_ : rdeps) {
auto rdep = rdep_.lock();
if (!rdep) continue;
bool runnable = false;
{
auto rdep_(rdep->state.lock());
assert(has(rdep_->deps, step));
rdep_->deps.erase(step);
if (rdep_->deps.empty()) runnable = true;
}
if (proceed) {
/* If this rdep has no other dependencies, then we can now
build it. */
if (runnable)
makeRunnable(rdep);
} else
/* If step failed or was cancelled, then delete all
dependent steps as well. */
destroyStep(rdep, false);
}
}
@ -386,17 +508,27 @@ std::set<Build::ptr> State::getDependentBuilds(Step::ptr step)
std::function<void(Step::ptr)> visit;
visit = [&](Step::ptr step) {
if (done.find(step) != done.end()) return;
if (has(done, step)) return;
done.insert(step);
for (auto & build : step->builds) {
auto build2 = build.lock();
if (build2) res.insert(build2);
std::vector<Step::wptr> rdeps;
{
auto step_(step->state.lock());
for (auto & build : step_->builds) {
auto build_ = build.lock();
if (build_) res.insert(build_);
}
for (auto & rdep : step->rdeps) {
auto rdep2 = rdep.lock();
if (rdep2) visit(rdep2);
/* Make a copy of rdeps so that we don't hold the lock for
very long. */
rdeps = step_->rdeps;
}
for (auto & rdep : rdeps) {
auto rdep_ = rdep.lock();
if (rdep_) visit(rdep_);
}
};
@ -408,11 +540,14 @@ std::set<Build::ptr> State::getDependentBuilds(Step::ptr step)
void State::makeRunnable(Step::ptr step)
{
assert(step->deps.empty());
{
auto step_(step->state.lock());
assert(step_->deps.empty());
}
{
std::lock_guard<std::mutex> lock(runnableMutex);
runnable.insert(step);
auto runnable_(runnable.lock());
runnable_->push_back(step);
}
runnableCV.notify_one();
@ -424,17 +559,20 @@ void State::builderThreadEntry(int slot)
auto store = openStore(); // FIXME: pool
while (true) {
/* Sleep until a runnable build step becomes available. */
Step::ptr step;
{
std::unique_lock<std::mutex> lock(runnableMutex);
while (runnable.empty())
runnableCV.wait(lock);
step = *runnable.begin();
runnable.erase(step);
auto runnable_(runnable.lock());
while (runnable_->empty())
runnable_.wait(runnableCV);
auto weak = *runnable_->begin();
runnable_->pop_front();
step = weak.lock();
if (!step) continue;
}
/* Build it. */
printMsg(lvlError, format("slot %1%: got build step %2%") % slot % step->drvPath);
doBuildStep(store, step);
}
@ -444,33 +582,37 @@ void State::builderThreadEntry(int slot)
void State::doBuildStep(std::shared_ptr<StoreAPI> store, Step::ptr step)
{
assert(step->deps.empty());
/* There can be any number of builds in the database that depend
on this derivation. Arbitrarily pick one (though preferring
those build of which this is the top-level derivation) for the
on this derivation. Arbitrarily pick one (though preferring a
build of which this is the top-level derivation) for the
purpose of creating build steps. We could create a build step
record for every build, but that could be very expensive
(e.g. a stdenv derivation can be a dependency of tens of
thousands of builds), so we don't. */
Build::ptr build;
auto builds = getDependentBuilds(step);
{
auto dependents = getDependentBuilds(step);
if (builds.empty()) {
/* Apparently all builds that depend on this derivation are
gone (e.g. cancelled). So don't bother. */
if (dependents.empty()) {
/* Apparently all builds that depend on this derivation
are gone (e.g. cancelled). So don't bother. (This is
very unlikely to happen, because normally Steps are
only kept alive by being reachable from a
Build). FIXME: what if a new Build gets a reference to
this step? */
printMsg(lvlInfo, format("cancelling build step %1%") % step->drvPath);
destroyStep(step, true);
destroyStep(step, false);
return;
}
for (auto build2 : builds)
for (auto build2 : dependents)
if (build2->drvPath == step->drvPath) { build = build2; break; }
if (!build) build = *builds.begin();
if (!build) build = *dependents.begin();
printMsg(lvlInfo, format("performing build step %1% (needed by %2% builds)") % step->drvPath % builds.size());
printMsg(lvlInfo, format("performing build step %1% (needed by %2% builds)") % step->drvPath % dependents.size());
}
/* Create a build step record indicating that we started
building. */
@ -499,8 +641,30 @@ void State::doBuildStep(std::shared_ptr<StoreAPI> store, Step::ptr step)
// FIXME: handle failed-with-output
// FIXME: handle new builds having been added in the meantime.
/* Remove this step. After this, incoming builds that depend on
drvPath will either see that the output paths exist, or will
create a new build step for drvPath. The latter is fine - it
won't conflict with this one, because we're removing it. In any
case, the set of dependent builds for step can't increase
anymore because step is no longer visible to createStep(). */
{
auto steps_(steps.lock());
steps_->erase(step->drvPath);
}
/* Get the final set of dependent builds. */
auto dependents = getDependentBuilds(step);
std::set<Build::ptr> direct;
{
auto step_(step->state.lock());
for (auto & build : step_->builds) {
auto build_ = build.lock();
if (build_) direct.insert(build_);
}
}
/* Update the database. */
{
pqxx::work txn(conn);
@ -510,24 +674,21 @@ void State::doBuildStep(std::shared_ptr<StoreAPI> store, Step::ptr step)
/* Mark all builds of which this derivation is the top
level as succeeded. */
for (auto build2_ : step->builds) {
auto build2 = build2_.lock();
if (!build2) continue;
for (auto build2 : direct)
markSucceededBuild(txn, build2, res, false, startTime, stopTime);
}
} else {
/* Create failed build steps for every build that depends
on this. */
finishBuildStep(txn, stopTime, build->id, stepNr, bssFailed, errorMsg);
for (auto build2 : builds) {
for (auto build2 : dependents) {
if (build == build2) continue;
createBuildStep(txn, stopTime, build2, step, bssFailed, errorMsg, build->id);
}
/* Mark all builds that depend on this derivation as failed. */
for (auto build2 : builds) {
for (auto build2 : dependents) {
txn.parameterized
("update Builds set finished = 1, isCachedBuild = 0, buildStatus = $2, startTime = $3, stopTime = $4 where id = $1")
(build2->id)
@ -539,10 +700,21 @@ void State::doBuildStep(std::shared_ptr<StoreAPI> store, Step::ptr step)
}
txn.commit();
}
/* Remove the build step from the graph. */
/* In case of success, destroy all Build objects of which step
is the top-level derivation. In case of failure, destroy all
dependent Build objects. Any Steps not referenced by other
Builds will be destroyed as well. */
for (auto build2 : dependents)
if (build2->toplevel == step || !success) {
auto builds_(builds.lock());
builds_->erase(build2->id);
}
/* Remove the step from the graph. In case of success, make
dependent build steps runnable if they have no other
dependencies. */
destroyStep(step, success);
}
@ -590,8 +762,6 @@ void State::run()
queueMonitorThread = std::thread(&State::queueMonitorThreadEntry, this);
sleep(1);
for (int n = 0; n < 4; n++)
std::thread(&State::builderThreadEntry, this, n).detach();

View file

@ -0,0 +1,53 @@
#pragma once
#include <mutex>
#include <condition_variable>
/* This template class ensures synchronized access to a value of type
T. It is used as follows:
struct Data { int x; ... };
Sync<Data> data;
{
auto data_(data.lock());
data_->x = 123;
}
Here, "data" is automatically unlocked when "data_" goes out of
scope.
*/
template <class T>
class Sync
{
private:
std::mutex mutex;
T data;
public:
class Lock
{
private:
Sync * s;
friend Sync;
Lock(Sync * s) : s(s) { s->mutex.lock(); }
public:
Lock(Lock && l) : s(l.s) { l.s = 0; }
Lock(const Lock & l) = delete;
~Lock() { if (s) s->mutex.unlock(); }
T * operator -> () { return &s->data; }
T & operator * () { return s->data; }
/* FIXME: performance impact of condition_variable_any? */
void wait(std::condition_variable_any & cv)
{
assert(s);
cv.wait(s->mutex);
}
};
Lock lock() { return Lock(this); }
};