diff --git a/src/hydra-queue-runner/hydra-queue-runner.cc b/src/hydra-queue-runner/hydra-queue-runner.cc index 4e0b9c53..94d36e44 100644 --- a/src/hydra-queue-runner/hydra-queue-runner.cc +++ b/src/hydra-queue-runner/hydra-queue-runner.cc @@ -210,7 +210,9 @@ public: void queueMonitor(); - void getQueuedBuilds(std::shared_ptr store, unsigned int & lastBuildId); + void getQueuedBuilds(Connection & conn, std::shared_ptr store, unsigned int & lastBuildId); + + void removeCancelledBuilds(Connection & conn); Step::ptr createStep(std::shared_ptr store, const Path & drvPath, std::set & newRunnable); @@ -389,7 +391,7 @@ void State::queueMonitor() unsigned int lastBuildId = 0; while (true) { - getQueuedBuilds(store, lastBuildId); + getQueuedBuilds(*conn, store, lastBuildId); /* Sleep until we get notification from the database about an event. */ @@ -403,6 +405,7 @@ void State::queueMonitor() } if (buildsCancelled.get()) { printMsg(lvlError, "got notification: builds cancelled"); + removeCancelledBuilds(*conn); } } @@ -410,21 +413,17 @@ void State::queueMonitor() } -void State::getQueuedBuilds(std::shared_ptr store, unsigned int & lastBuildId) +void State::getQueuedBuilds(Connection & conn, std::shared_ptr store, unsigned int & lastBuildId) { printMsg(lvlError, format("checking the queue for builds > %1%...") % lastBuildId); - auto conn(dbPool.get()); - /* Grab the queued builds from the database, but don't process them yet (since we don't want a long-running transaction). */ std::list newBuilds; // FIXME: use queue { - pqxx::work txn(*conn); + pqxx::work txn(conn); - // FIXME: query only builds with ID higher than the previous - // highest. auto res = txn.parameterized("select * from Builds where id > $1 and finished = 0 order by id")(lastBuildId).exec(); for (auto const & row : res) { @@ -454,7 +453,7 @@ void State::getQueuedBuilds(std::shared_ptr store, unsigned int & last if (!store->isValidPath(build->drvPath)) { /* Derivation has been GC'ed prematurely. */ printMsg(lvlInfo, format("aborting GC'ed build %1%") % build->id); - pqxx::work txn(*conn); + pqxx::work txn(conn); txn.parameterized ("update Builds set finished = 1, busy = 0, buildStatus = $2, startTime = $3, stopTime = $3, errorMsg = $4 where id = $1") (build->id) @@ -476,7 +475,7 @@ void State::getQueuedBuilds(std::shared_ptr store, unsigned int & last printMsg(lvlInfo, format("cached build %1%") % build->id); - pqxx::work txn(*conn); + pqxx::work txn(conn); time_t now = time(0); markSucceededBuild(txn, build, res, true, now, now); txn.commit(); @@ -511,6 +510,30 @@ void State::getQueuedBuilds(std::shared_ptr store, unsigned int & last } +void State::removeCancelledBuilds(Connection & conn) +{ + /* Get the current set of queued builds. */ + std::set currentIds; + { + pqxx::work txn(conn); + auto res = txn.exec("select id from Builds where finished = 0"); + for (auto const & row : res) + currentIds.insert(row["id"].as()); + } + + auto builds_(builds.lock()); + + for (auto i = builds_->begin(); i != builds_->end(); ) { + if (currentIds.find(i->first) == currentIds.end()) { + printMsg(lvlInfo, format("discarding cancelled build %1%") % i->first); + i = builds_->erase(i); + // FIXME: ideally we would interrupt active build steps here. + } else + ++i; + } +} + + Step::ptr State::createStep(std::shared_ptr store, const Path & drvPath, std::set & newRunnable) { diff --git a/src/lib/Hydra/Helper/Nix.pm b/src/lib/Hydra/Helper/Nix.pm index 9523ddaa..c54c8f10 100644 --- a/src/lib/Hydra/Helper/Nix.pm +++ b/src/lib/Hydra/Helper/Nix.pm @@ -433,6 +433,7 @@ sub cancelBuilds($$) { , starttime => $time , stoptime => $time }); + $db->storage->dbh->do("notify builds_cancelled"); return $n; }); }