Support cancelling builds

This commit is contained in:
Eelco Dolstra 2015-06-11 18:07:45 +02:00
parent c08883966c
commit c974fb893b
2 changed files with 34 additions and 10 deletions

View file

@ -210,7 +210,9 @@ public:
void queueMonitor(); void queueMonitor();
void getQueuedBuilds(std::shared_ptr<StoreAPI> store, unsigned int & lastBuildId); void getQueuedBuilds(Connection & conn, std::shared_ptr<StoreAPI> store, unsigned int & lastBuildId);
void removeCancelledBuilds(Connection & conn);
Step::ptr createStep(std::shared_ptr<StoreAPI> store, const Path & drvPath, Step::ptr createStep(std::shared_ptr<StoreAPI> store, const Path & drvPath,
std::set<Step::ptr> & newRunnable); std::set<Step::ptr> & newRunnable);
@ -389,7 +391,7 @@ void State::queueMonitor()
unsigned int lastBuildId = 0; unsigned int lastBuildId = 0;
while (true) { while (true) {
getQueuedBuilds(store, lastBuildId); getQueuedBuilds(*conn, store, lastBuildId);
/* Sleep until we get notification from the database about an /* Sleep until we get notification from the database about an
event. */ event. */
@ -403,6 +405,7 @@ void State::queueMonitor()
} }
if (buildsCancelled.get()) { if (buildsCancelled.get()) {
printMsg(lvlError, "got notification: builds cancelled"); printMsg(lvlError, "got notification: builds cancelled");
removeCancelledBuilds(*conn);
} }
} }
@ -410,21 +413,17 @@ void State::queueMonitor()
} }
void State::getQueuedBuilds(std::shared_ptr<StoreAPI> store, unsigned int & lastBuildId) void State::getQueuedBuilds(Connection & conn, std::shared_ptr<StoreAPI> store, unsigned int & lastBuildId)
{ {
printMsg(lvlError, format("checking the queue for builds > %1%...") % lastBuildId); printMsg(lvlError, format("checking the queue for builds > %1%...") % lastBuildId);
auto conn(dbPool.get());
/* Grab the queued builds from the database, but don't process /* Grab the queued builds from the database, but don't process
them yet (since we don't want a long-running transaction). */ them yet (since we don't want a long-running transaction). */
std::list<Build::ptr> newBuilds; // FIXME: use queue std::list<Build::ptr> newBuilds; // FIXME: use queue
{ {
pqxx::work txn(*conn); pqxx::work txn(conn);
// FIXME: query only builds with ID higher than the previous
// highest.
auto res = txn.parameterized("select * from Builds where id > $1 and finished = 0 order by id")(lastBuildId).exec(); auto res = txn.parameterized("select * from Builds where id > $1 and finished = 0 order by id")(lastBuildId).exec();
for (auto const & row : res) { for (auto const & row : res) {
@ -454,7 +453,7 @@ void State::getQueuedBuilds(std::shared_ptr<StoreAPI> store, unsigned int & last
if (!store->isValidPath(build->drvPath)) { if (!store->isValidPath(build->drvPath)) {
/* Derivation has been GC'ed prematurely. */ /* Derivation has been GC'ed prematurely. */
printMsg(lvlInfo, format("aborting GC'ed build %1%") % build->id); printMsg(lvlInfo, format("aborting GC'ed build %1%") % build->id);
pqxx::work txn(*conn); pqxx::work txn(conn);
txn.parameterized txn.parameterized
("update Builds set finished = 1, busy = 0, buildStatus = $2, startTime = $3, stopTime = $3, errorMsg = $4 where id = $1") ("update Builds set finished = 1, busy = 0, buildStatus = $2, startTime = $3, stopTime = $3, errorMsg = $4 where id = $1")
(build->id) (build->id)
@ -476,7 +475,7 @@ void State::getQueuedBuilds(std::shared_ptr<StoreAPI> store, unsigned int & last
printMsg(lvlInfo, format("cached build %1%") % build->id); printMsg(lvlInfo, format("cached build %1%") % build->id);
pqxx::work txn(*conn); pqxx::work txn(conn);
time_t now = time(0); time_t now = time(0);
markSucceededBuild(txn, build, res, true, now, now); markSucceededBuild(txn, build, res, true, now, now);
txn.commit(); txn.commit();
@ -511,6 +510,30 @@ void State::getQueuedBuilds(std::shared_ptr<StoreAPI> store, unsigned int & last
} }
void State::removeCancelledBuilds(Connection & conn)
{
/* Get the current set of queued builds. */
std::set<BuildID> currentIds;
{
pqxx::work txn(conn);
auto res = txn.exec("select id from Builds where finished = 0");
for (auto const & row : res)
currentIds.insert(row["id"].as<BuildID>());
}
auto builds_(builds.lock());
for (auto i = builds_->begin(); i != builds_->end(); ) {
if (currentIds.find(i->first) == currentIds.end()) {
printMsg(lvlInfo, format("discarding cancelled build %1%") % i->first);
i = builds_->erase(i);
// FIXME: ideally we would interrupt active build steps here.
} else
++i;
}
}
Step::ptr State::createStep(std::shared_ptr<StoreAPI> store, const Path & drvPath, Step::ptr State::createStep(std::shared_ptr<StoreAPI> store, const Path & drvPath,
std::set<Step::ptr> & newRunnable) std::set<Step::ptr> & newRunnable)
{ {

View file

@ -433,6 +433,7 @@ sub cancelBuilds($$) {
, starttime => $time , starttime => $time
, stoptime => $time , stoptime => $time
}); });
$db->storage->dbh->do("notify builds_cancelled");
return $n; return $n;
}); });
} }