Fix race between the queue monitor and the builder threads

This commit is contained in:
Eelco Dolstra 2015-06-18 16:30:28 +02:00
parent 9c03b11ca8
commit 948473c909

View file

@ -120,9 +120,7 @@ struct Build
std::shared_ptr<Step> toplevel; std::shared_ptr<Step> toplevel;
bool finishedInDB; std::atomic_bool finishedInDB{false};
Build() : finishedInDB(false) { }
~Build() ~Build()
{ {
@ -158,13 +156,15 @@ struct Step
system_time after; system_time after;
}; };
std::atomic_bool created{false}; // debugging
std::atomic_bool finished{false}; // debugging
Sync<State> state; Sync<State> state;
std::atomic_bool destroyed; ~Step()
{
Step() : destroyed(false) { } printMsg(lvlError, format("destroying step %1%") % drvPath);
}
~Step() { }
}; };
@ -280,13 +280,9 @@ public:
void removeCancelledBuilds(Connection & conn); void removeCancelledBuilds(Connection & conn);
Step::ptr createStep(std::shared_ptr<StoreAPI> store, const Path & drvPath, Step::ptr createStep(std::shared_ptr<StoreAPI> store, const Path & drvPath,
Build::ptr referringBuild, Step::ptr referringStep,
std::set<Step::ptr> & newSteps, std::set<Step::ptr> & newRunnable); std::set<Step::ptr> & newSteps, std::set<Step::ptr> & newRunnable);
void destroyStep(Step::ptr step, bool proceed);
/* Get the builds that depend on the given step. */
std::set<Build::ptr> getDependentBuilds(Step::ptr step);
void makeRunnable(Step::ptr step); void makeRunnable(Step::ptr step);
/* The thread that selects and starts runnable builds. */ /* The thread that selects and starts runnable builds. */
@ -525,6 +521,7 @@ void State::getQueuedBuilds(Connection & conn, std::shared_ptr<StoreAPI> store,
/* Derivation has been GC'ed prematurely. */ /* Derivation has been GC'ed prematurely. */
printMsg(lvlError, format("aborting GC'ed build %1%") % build->id); printMsg(lvlError, format("aborting GC'ed build %1%") % build->id);
pqxx::work txn(conn); pqxx::work txn(conn);
assert(!build->finishedInDB);
txn.parameterized txn.parameterized
("update Builds set finished = 1, busy = 0, buildStatus = $2, startTime = $3, stopTime = $3, errorMsg = $4 where id = $1") ("update Builds set finished = 1, busy = 0, buildStatus = $2, startTime = $3, stopTime = $3, errorMsg = $4 where id = $1")
(build->id) (build->id)
@ -538,7 +535,7 @@ void State::getQueuedBuilds(Connection & conn, std::shared_ptr<StoreAPI> store,
} }
std::set<Step::ptr> newSteps; std::set<Step::ptr> newSteps;
Step::ptr step = createStep(store, build->drvPath, newSteps, newRunnable); Step::ptr step = createStep(store, build->drvPath, build, 0, newSteps, newRunnable);
/* Some of the new steps may be the top level of builds that /* Some of the new steps may be the top level of builds that
we haven't processed yet. So do them now. This ensures that we haven't processed yet. So do them now. This ensures that
@ -560,13 +557,13 @@ void State::getQueuedBuilds(Connection & conn, std::shared_ptr<StoreAPI> store,
Derivation drv = readDerivation(build->drvPath); Derivation drv = readDerivation(build->drvPath);
BuildResult res = getBuildResult(store, drv); BuildResult res = getBuildResult(store, drv);
printMsg(lvlInfo, format("marking build %1% as cached successful") % build->id);
pqxx::work txn(conn); pqxx::work txn(conn);
time_t now = time(0); time_t now = time(0);
markSucceededBuild(txn, build, res, true, now, now); markSucceededBuild(txn, build, res, true, now, now);
txn.commit(); txn.commit();
build->finishedInDB = true;
return; return;
} }
@ -603,6 +600,7 @@ void State::getQueuedBuilds(Connection & conn, std::shared_ptr<StoreAPI> store,
time_t now = time(0); time_t now = time(0);
pqxx::work txn(conn); pqxx::work txn(conn);
createBuildStep(txn, 0, build, r, "", buildStepStatus); createBuildStep(txn, 0, build, r, "", buildStepStatus);
assert(!build->finishedInDB);
txn.parameterized txn.parameterized
("update Builds set finished = 1, busy = 0, buildStatus = $2, startTime = $3, stopTime = $3, isCachedBuild = $4 where id = $1") ("update Builds set finished = 1, busy = 0, buildStatus = $2, startTime = $3, stopTime = $3, isCachedBuild = $4 where id = $1")
(build->id) (build->id)
@ -624,20 +622,12 @@ void State::getQueuedBuilds(Connection & conn, std::shared_ptr<StoreAPI> store,
{ {
auto builds_(builds.lock()); auto builds_(builds.lock());
auto step_(step->state.lock());
(*builds_)[build->id] = build; (*builds_)[build->id] = build;
step_->builds.push_back(build);
build->toplevel = step; build->toplevel = step;
} }
printMsg(lvlChatty, format("added build %1% (top-level step %2%, %3% new steps)") printMsg(lvlChatty, format("added build %1% (top-level step %2%, %3% new steps)")
% build->id % step->drvPath % newSteps.size()); % build->id % step->drvPath % newSteps.size());
/* Prior to this, the build is not visible to
getDependentBuilds(). Now it is, so the build can be
failed if a dependency fails. (It can't succeed right away
because its top-level is not runnable yet). */
}; };
/* Now instantiate build steps for each new build. The builder /* Now instantiate build steps for each new build. The builder
@ -687,32 +677,65 @@ void State::removeCancelledBuilds(Connection & conn)
Step::ptr State::createStep(std::shared_ptr<StoreAPI> store, const Path & drvPath, Step::ptr State::createStep(std::shared_ptr<StoreAPI> store, const Path & drvPath,
Build::ptr referringBuild, Step::ptr referringStep,
std::set<Step::ptr> & newSteps, std::set<Step::ptr> & newRunnable) std::set<Step::ptr> & newSteps, std::set<Step::ptr> & newRunnable)
{ {
/* Check if the requested step already exists. */ /* Check if the requested step already exists. If not, create a
new step. In any case, make the step reachable from
referringBuild or referringStep. This is done atomically (with
steps locked), to ensure that this step can never become
reachable from a new build after doBuildStep has removed it
from steps. */
Step::ptr step;
bool isNew = false;
{ {
auto steps_(steps.lock()); auto steps_(steps.lock());
/* See if the step already exists in steps and is not
stale. */
auto prev = steps_->find(drvPath); auto prev = steps_->find(drvPath);
if (prev != steps_->end()) { if (prev != steps_->end()) {
auto step = prev->second.lock(); step = prev->second.lock();
/* Since step is a strong pointer, the referred Step /* Since step is a strong pointer, the referred Step
object won't be deleted after this. */ object won't be deleted after this. */
if (step) return step; if (!step) steps_->erase(drvPath); // remove stale entry
steps_->erase(drvPath); // remove stale entry
} }
/* If it doesn't exist, create it. */
if (!step) {
step = std::make_shared<Step>();
step->drvPath = drvPath;
isNew = true;
}
auto step_(step->state.lock());
if (referringBuild)
step_->builds.push_back(referringBuild);
if (referringStep)
step_->rdeps.push_back(referringStep);
(*steps_)[drvPath] = step;
} }
printMsg(lvlDebug, format("considering derivation %1%") % drvPath); printMsg(lvlDebug, format("considering derivation %1%") % drvPath);
auto step = std::make_shared<Step>(); if (!isNew) {
step->drvPath = drvPath; assert(step->created);
return step;
}
/* Initialize the step. Note that the step may be visible in
steps before this point, but that doesn't matter because
it's not runnable yet, and other threads won't make it
runnable while step->created == false. */
step->drv = readDerivation(drvPath); step->drv = readDerivation(drvPath);
{ {
auto i = step->drv.env.find("requiredSystemFeatures"); auto i = step->drv.env.find("requiredSystemFeatures");
if (i != step->drv.env.end()) if (i != step->drv.env.end())
step->requiredSystemFeatures = tokenizeString<std::set<std::string>>(i->second); step->requiredSystemFeatures = tokenizeString<std::set<std::string>>(i->second);
} }
newSteps.insert(step);
/* Are all outputs valid? */ /* Are all outputs valid? */
bool valid = true; bool valid = true;
@ -728,94 +751,39 @@ Step::ptr State::createStep(std::shared_ptr<StoreAPI> store, const Path & drvPat
/* No, we need to build. */ /* No, we need to build. */
printMsg(lvlDebug, format("creating build step %1%") % drvPath); printMsg(lvlDebug, format("creating build step %1%") % drvPath);
newSteps.insert(step);
/* Create steps for the dependencies. */ /* Create steps for the dependencies. */
bool hasDeps = false;
for (auto & i : step->drv.inputDrvs) { for (auto & i : step->drv.inputDrvs) {
Step::ptr dep = createStep(store, i.first, newSteps, newRunnable); auto dep = createStep(store, i.first, 0, step, newSteps, newRunnable);
if (dep) { if (dep) {
hasDeps = true;
auto step_(step->state.lock()); auto step_(step->state.lock());
auto dep_(dep->state.lock());
step_->deps.insert(dep); step_->deps.insert(dep);
dep_->rdeps.push_back(step);
} }
} }
/* If the step has no (remaining) dependencies, make it
runnable. */
{ {
auto steps_(steps.lock()); auto step_(step->state.lock());
assert(steps_->find(drvPath) == steps_->end()); assert(!step->created);
(*steps_)[drvPath] = step; step->created = true;
if (step_->deps.empty())
newRunnable.insert(step);
} }
if (!hasDeps) newRunnable.insert(step);
return step; return step;
} }
void State::destroyStep(Step::ptr step, bool proceed) /* Get the steps and unfinished builds that depend on the given step. */
void getDependents(Step::ptr step, std::set<Build::ptr> & builds, std::set<Step::ptr> & steps)
{ {
if (step->destroyed) return;
step->destroyed = true;
printMsg(lvlDebug, format("destroying build step %1%") % step->drvPath);
nrStepsDone++;
{
auto steps_(steps.lock());
steps_->erase(step->drvPath);
}
std::vector<Step::wptr> rdeps;
{
auto step_(step->state.lock());
rdeps = step_->rdeps;
/* Sanity checks. */
for (auto & build_ : step_->builds) {
auto build = build_.lock();
if (!build) continue;
assert(build->drvPath == step->drvPath);
assert(build->finishedInDB);
}
}
for (auto & rdep_ : rdeps) {
auto rdep = rdep_.lock();
if (!rdep) continue;
bool runnable = false;
{
auto rdep_(rdep->state.lock());
assert(has(rdep_->deps, step));
rdep_->deps.erase(step);
if (rdep_->deps.empty()) runnable = true;
}
if (proceed) {
/* If this rdep has no other dependencies, then we can now
build it. */
if (runnable)
makeRunnable(rdep);
} else
/* If step failed or was cancelled, then delete all
dependent steps as well. */
destroyStep(rdep, false);
}
}
std::set<Build::ptr> State::getDependentBuilds(Step::ptr step)
{
std::set<Step::ptr> done;
std::set<Build::ptr> res;
std::function<void(Step::ptr)> visit; std::function<void(Step::ptr)> visit;
visit = [&](Step::ptr step) { visit = [&](Step::ptr step) {
if (has(done, step)) return; if (has(steps, step)) return;
done.insert(step); steps.insert(step);
std::vector<Step::wptr> rdeps; std::vector<Step::wptr> rdeps;
@ -824,7 +792,7 @@ std::set<Build::ptr> State::getDependentBuilds(Step::ptr step)
for (auto & build : step_->builds) { for (auto & build : step_->builds) {
auto build_ = build.lock(); auto build_ = build.lock();
if (build_) res.insert(build_); if (build_ && !build_->finishedInDB) builds.insert(build_);
} }
/* Make a copy of rdeps so that we don't hold the lock for /* Make a copy of rdeps so that we don't hold the lock for
@ -839,8 +807,6 @@ std::set<Build::ptr> State::getDependentBuilds(Step::ptr step)
}; };
visit(step); visit(step);
return res;
} }
@ -850,6 +816,8 @@ void State::makeRunnable(Step::ptr step)
{ {
auto step_(step->state.lock()); auto step_(step->state.lock());
assert(step->created);
assert(!step->finished);
assert(step_->deps.empty()); assert(step_->deps.empty());
} }
@ -913,7 +881,7 @@ void State::dispatcher()
if (machine->currentJobs >= machine->maxJobs) continue; if (machine->currentJobs >= machine->maxJobs) continue;
auto runnable_(runnable.lock()); auto runnable_(runnable.lock());
printMsg(lvlDebug, format("%1% runnable builds") % runnable_->size()); //printMsg(lvlDebug, format("%1% runnable builds") % runnable_->size());
/* FIXME: we're holding the runnable lock too long /* FIXME: we're holding the runnable lock too long
here. This could be more efficient. */ here. This could be more efficient. */
@ -1024,6 +992,12 @@ void State::builder(Step::ptr step, MachineReservation::ptr reservation)
bool State::doBuildStep(std::shared_ptr<StoreAPI> store, Step::ptr step, bool State::doBuildStep(std::shared_ptr<StoreAPI> store, Step::ptr step,
Machine::ptr machine) Machine::ptr machine)
{ {
{
auto step_(step->state.lock());
assert(step->created);
assert(!step->finished);
}
/* There can be any number of builds in the database that depend /* There can be any number of builds in the database that depend
on this derivation. Arbitrarily pick one (though preferring a on this derivation. Arbitrarily pick one (though preferring a
build of which this is the top-level derivation) for the build of which this is the top-level derivation) for the
@ -1034,7 +1008,9 @@ bool State::doBuildStep(std::shared_ptr<StoreAPI> store, Step::ptr step,
Build::ptr build; Build::ptr build;
{ {
auto dependents = getDependentBuilds(step); std::set<Build::ptr> dependents;
std::set<Step::ptr> steps;
getDependents(step, dependents, steps);
if (dependents.empty()) { if (dependents.empty()) {
/* Apparently all builds that depend on this derivation /* Apparently all builds that depend on this derivation
@ -1117,112 +1093,176 @@ bool State::doBuildStep(std::shared_ptr<StoreAPI> store, Step::ptr step,
} }
} }
/* Remove this step. After this, incoming builds that depend on if (result.status == RemoteResult::rrSuccess) {
drvPath will either see that the output paths exist, or will
create a new build step for drvPath. The latter is fine - it
won't conflict with this one, because we're removing it. In any
case, the set of dependent builds for step can't increase
anymore because step is no longer visible to createStep(). */
auto steps_(steps.lock());
steps_->erase(step->drvPath);
/* Get the final set of dependent builds. */ /* Register success in the database for all Build objects that
auto dependents = getDependentBuilds(step); have this step as the top-level step. Since the queue
monitor thread may be creating new referring Builds
concurrently, and updating the database may fail, we do
this in a loop, marking all known builds, repeating until
there are no unmarked builds.
*/
while (true) {
std::set<Build::ptr> direct; /* Get the builds that have this one as the top-level. */
{ std::vector<Build::ptr> direct;
auto step_(step->state.lock()); {
for (auto & build : step_->builds) { auto steps_(steps.lock());
auto build_ = build.lock(); auto step_(step->state.lock());
if (build_) direct.insert(build_);
}
}
/* Update the database. */ for (auto & b_ : step_->builds) {
{ auto b = b_.lock();
pqxx::work txn(*conn); if (b && !b->finishedInDB) direct.push_back(b);
}
if (result.status == RemoteResult::rrSuccess) { /* If there are no builds left to update in the DB,
then we're done. Delete the step from
finishBuildStep(txn, result.startTime, result.stopTime, build->id, stepNr, machine->sshName, bssSuccess); steps. Since we've been holding the steps lock,
no new referrers can have been added in the
/* Mark all builds of which this derivation is the top meantime or be added afterwards. */
level as succeeded. */ if (direct.empty()) {
for (auto build2 : direct) printMsg(lvlDebug, format("finishing build step %1%") % step->drvPath);
markSucceededBuild(txn, build2, res, build != build2, nrStepsDone++;
result.startTime, result.stopTime); steps_->erase(step->drvPath);
break;
} else { }
/* Failure case. */
BuildStatus buildStatus =
result.status == RemoteResult::rrPermanentFailure ? bsFailed :
result.status == RemoteResult::rrTimedOut ? bsTimedOut :
bsAborted;
BuildStepStatus buildStepStatus =
result.status == RemoteResult::rrPermanentFailure ? bssFailed :
result.status == RemoteResult::rrTimedOut ? bssTimedOut :
bssAborted;
/* For regular failures, we don't care about the error
message. */
if (buildStatus != bsAborted) result.errorMsg = "";
/* Create failed build steps for every build that depends
on this. For cached failures, only create a step for
builds that don't have this step as top-level
(otherwise the user won't be able to see what caused
the build to fail). */
for (auto build2 : dependents) {
if (build == build2) continue;
if (cachedFailure && build2->drvPath == step->drvPath) continue;
createBuildStep(txn, 0, build2, step, machine->sshName,
buildStepStatus, result.errorMsg, build->id);
} }
if (!cachedFailure) /* Update the database. */
finishBuildStep(txn, result.startTime, result.stopTime, build->id, {
stepNr, machine->sshName, buildStepStatus, result.errorMsg); pqxx::work txn(*conn);
/* Mark all builds that depend on this derivation as failed. */ finishBuildStep(txn, result.startTime, result.stopTime, build->id, stepNr, machine->sshName, bssSuccess);
for (auto build2 : dependents) {
printMsg(lvlError, format("marking build %1% as failed") % build2->id); for (auto & b : direct)
txn.parameterized markSucceededBuild(txn, b, res, build != b,
("update Builds set finished = 1, busy = 0, buildStatus = $2, startTime = $3, stopTime = $4, isCachedBuild = $5 where id = $1") result.startTime, result.stopTime);
(build2->id)
((int) (build2->drvPath != step->drvPath && buildStatus == bsFailed ? bsDepFailed : buildStatus)) txn.commit();
(result.startTime)
(result.stopTime)
(cachedFailure ? 1 : 0).exec();
build2->finishedInDB = true; // FIXME: txn might fail
nrBuildsDone++;
} }
/* Remember failed paths in the database so that they /* Remove the direct dependencies from builds. This will
won't be built again. */ cause them to be destroyed. */
if (!cachedFailure && result.status == RemoteResult::rrPermanentFailure) for (auto & b : direct) {
for (auto & path : outputPaths(step->drv)) auto builds_(builds.lock());
txn.parameterized("insert into FailedPaths values ($1)")(path).exec(); b->finishedInDB = true;
builds_->erase(b->id);
}
}
/* Wake up any dependent steps that have no other
dependencies. */
{
auto step_(step->state.lock());
for (auto & rdepWeak : step_->rdeps) {
auto rdep = rdepWeak.lock();
if (!rdep) continue;
bool runnable = false;
{
auto rdep_(rdep->state.lock());
rdep_->deps.erase(step);
if (rdep_->deps.empty()) runnable = true;
}
if (runnable) makeRunnable(rdep);
}
}
} else {
/* Register failure in the database for all Build objects that
directly or indirectly depend on this step. */
while (true) {
/* Get the builds and steps that depend on this step. */
std::set<Build::ptr> indirect;
{
auto steps_(steps.lock());
std::set<Step::ptr> steps;
getDependents(step, indirect, steps);
/* If there are no builds left, delete all referring
steps from steps. As for the success case, we can
be certain no new referrers can be added. */
if (indirect.empty()) {
for (auto & s : steps) {
printMsg(lvlDebug, format("finishing build step %1%") % step->drvPath);
nrStepsDone++;
steps_->erase(s->drvPath);
}
break;
}
}
/* Update the database. */
{
pqxx::work txn(*conn);
BuildStatus buildStatus =
result.status == RemoteResult::rrPermanentFailure ? bsFailed :
result.status == RemoteResult::rrTimedOut ? bsTimedOut :
bsAborted;
BuildStepStatus buildStepStatus =
result.status == RemoteResult::rrPermanentFailure ? bssFailed :
result.status == RemoteResult::rrTimedOut ? bssTimedOut :
bssAborted;
/* For regular failures, we don't care about the error
message. */
if (buildStatus != bsAborted) result.errorMsg = "";
/* Create failed build steps for every build that depends
on this. For cached failures, only create a step for
builds that don't have this step as top-level
(otherwise the user won't be able to see what caused
the build to fail). */
for (auto & build2 : indirect) {
if (build == build2) continue;
if (cachedFailure && build2->drvPath == step->drvPath) continue;
createBuildStep(txn, 0, build2, step, machine->sshName,
buildStepStatus, result.errorMsg, build->id);
}
if (!cachedFailure)
finishBuildStep(txn, result.startTime, result.stopTime, build->id,
stepNr, machine->sshName, buildStepStatus, result.errorMsg);
/* Mark all builds that depend on this derivation as failed. */
for (auto & build2 : indirect) {
printMsg(lvlError, format("marking build %1% as failed") % build2->id);
assert(!build->finishedInDB);
txn.parameterized
("update Builds set finished = 1, busy = 0, buildStatus = $2, startTime = $3, stopTime = $4, isCachedBuild = $5 where id = $1")
(build2->id)
((int) (build2->drvPath != step->drvPath && buildStatus == bsFailed ? bsDepFailed : buildStatus))
(result.startTime)
(result.stopTime)
(cachedFailure ? 1 : 0).exec();
nrBuildsDone++;
}
/* Remember failed paths in the database so that they
won't be built again. */
if (!cachedFailure && result.status == RemoteResult::rrPermanentFailure)
for (auto & path : outputPaths(step->drv))
txn.parameterized("insert into FailedPaths values ($1)")(path).exec();
txn.commit();
}
/* Remove the indirect dependencies from builds. This
will cause them to be destroyed. */
for (auto & b : indirect) {
auto builds_(builds.lock());
b->finishedInDB = true;
builds_->erase(b->id);
}
} }
txn.commit();
} }
/* In case of success, destroy all Build objects of which step
is the top-level derivation. In case of failure, destroy all
dependent Build objects. Any Steps not referenced by other
Builds will be destroyed as well. */
for (auto build2 : dependents)
if (build2->toplevel == step || result.status != RemoteResult::rrSuccess) {
auto builds_(builds.lock());
builds_->erase(build2->id);
}
/* Remove the step from the graph. In case of success, make
dependent build steps runnable if they have no other
dependencies. */
destroyStep(step, result.status == RemoteResult::rrSuccess);
return false; return false;
} }
@ -1232,6 +1272,8 @@ void State::markSucceededBuild(pqxx::work & txn, Build::ptr build,
{ {
printMsg(lvlInfo, format("marking build %1% as succeeded") % build->id); printMsg(lvlInfo, format("marking build %1% as succeeded") % build->id);
assert(!build->finishedInDB);
txn.parameterized txn.parameterized
("update Builds set finished = 1, busy = 0, buildStatus = $2, startTime = $3, stopTime = $4, size = $5, closureSize = $6, releaseName = $7, isCachedBuild = $8 where id = $1") ("update Builds set finished = 1, busy = 0, buildStatus = $2, startTime = $3, stopTime = $4, size = $5, closureSize = $6, releaseName = $7, isCachedBuild = $8 where id = $1")
(build->id) (build->id)
@ -1259,7 +1301,6 @@ void State::markSucceededBuild(pqxx::work & txn, Build::ptr build,
(product.defaultPath).exec(); (product.defaultPath).exec();
} }
build->finishedInDB = true; // FIXME: txn might fail
nrBuildsDone++; nrBuildsDone++;
} }