diff --git a/src/hydra-queue-runner/hydra-queue-runner.cc b/src/hydra-queue-runner/hydra-queue-runner.cc index 9e5553b6..4e0b9c53 100644 --- a/src/hydra-queue-runner/hydra-queue-runner.cc +++ b/src/hydra-queue-runner/hydra-queue-runner.cc @@ -156,10 +156,6 @@ private: Path hydraData, logDir; - /* CV for waking up the queue. */ - std::condition_variable queueMonitorWakeup; - std::mutex queueMonitorMutex; - /* The queued builds. */ typedef std::map Builds; Sync builds; @@ -214,7 +210,7 @@ public: void queueMonitor(); - void getQueuedBuilds(std::shared_ptr store); + void getQueuedBuilds(std::shared_ptr store, unsigned int & lastBuildId); Step::ptr createStep(std::shared_ptr store, const Path & drvPath, std::set & newRunnable); @@ -366,14 +362,47 @@ void State::finishBuildStep(pqxx::work & txn, time_t startTime, time_t stopTime, void State::queueMonitor() { + auto conn(dbPool.get()); + + struct receiver : public pqxx::notification_receiver + { + bool status = false; + receiver(pqxx::connection_base & c, const std::string & channel) + : pqxx::notification_receiver(c, channel) { } + void operator() (const string & payload, int pid) override + { + status = true; + }; + bool get() { + bool b = status; + status = false; + return b; + } + }; + + receiver buildsAdded(*conn, "builds_added"); + receiver buildsRestarted(*conn, "builds_restarted"); + receiver buildsCancelled(*conn, "builds_cancelled"); + auto store = openStore(); // FIXME: pool - while (true) { - getQueuedBuilds(store); + unsigned int lastBuildId = 0; - { - std::unique_lock lock(queueMonitorMutex); - queueMonitorWakeup.wait_for(lock, std::chrono::seconds(5)); + while (true) { + getQueuedBuilds(store, lastBuildId); + + /* Sleep until we get notification from the database about an + event. */ + conn->await_notification(); + + if (buildsAdded.get()) + printMsg(lvlError, "got notification: new builds added to the queue"); + if (buildsRestarted.get()) { + printMsg(lvlError, "got notification: builds restarted"); + lastBuildId = 0; // check all builds + } + if (buildsCancelled.get()) { + printMsg(lvlError, "got notification: builds cancelled"); } } @@ -381,9 +410,9 @@ void State::queueMonitor() } -void State::getQueuedBuilds(std::shared_ptr store) +void State::getQueuedBuilds(std::shared_ptr store, unsigned int & lastBuildId) { - printMsg(lvlError, "checking the queue..."); + printMsg(lvlError, format("checking the queue for builds > %1%...") % lastBuildId); auto conn(dbPool.get()); @@ -396,11 +425,12 @@ void State::getQueuedBuilds(std::shared_ptr store) // FIXME: query only builds with ID higher than the previous // highest. - auto res = txn.exec("select * from Builds where finished = 0 order by id"); + auto res = txn.parameterized("select * from Builds where id > $1 and finished = 0 order by id")(lastBuildId).exec(); for (auto const & row : res) { auto builds_(builds.lock()); BuildID id = row["id"].as(); + if (id > lastBuildId) lastBuildId = id; if (has(*builds_, id)) continue; auto build = std::make_shared(); diff --git a/src/hydra-queue-runner/pool.hh b/src/hydra-queue-runner/pool.hh index 0a58ebe0..a1cd3977 100644 --- a/src/hydra-queue-runner/pool.hh +++ b/src/hydra-queue-runner/pool.hh @@ -57,7 +57,7 @@ public: if (r) state_->idle.push_back(r); } - R * operator -> () { return r; } + R * operator -> () { return r.get(); } R & operator * () { return *r; } }; diff --git a/src/lib/Hydra/Helper/Nix.pm b/src/lib/Hydra/Helper/Nix.pm index 7a68490f..9523ddaa 100644 --- a/src/lib/Hydra/Helper/Nix.pm +++ b/src/lib/Hydra/Helper/Nix.pm @@ -469,6 +469,8 @@ sub restartBuilds($$) { # FIXME: Add this to the API. # FIXME: clear the dependencies? $db->resultset('FailedPaths')->search({ path => [ @paths ]})->delete; + + $db->storage->dbh->do("notify builds_restarted"); }); return scalar(@buildIds); diff --git a/src/script/hydra-evaluator b/src/script/hydra-evaluator index 9d463122..bcdb948e 100755 --- a/src/script/hydra-evaluator +++ b/src/script/hydra-evaluator @@ -246,6 +246,8 @@ sub checkJobsetWrapped { $jobset->update({ enabled => 0 }) if $jobset->enabled == 2; $jobset->update({ lastcheckedtime => time }); + + $db->storage->dbh->do("notify builds_added"); }); # Store the error messages for jobs that failed to evaluate.