From c08883966c55c8063725ec374de560c0242e64bd Mon Sep 17 00:00:00 2001 From: Eelco Dolstra Date: Thu, 11 Jun 2015 17:38:55 +0200 Subject: [PATCH] Use PostgreSQL notifications for queue events Hydra-queue-runner now no longer polls the queue periodically, but instead sleeps until it receives a notification from PostgreSQL about a change to the queue (build added, build cancelled or build restarted). Also, for the "build added" case, we now only check for builds with an ID greater than the previous greatest ID. This is much more efficient if the queue is large. --- src/hydra-queue-runner/hydra-queue-runner.cc | 56 +++++++++++++++----- src/hydra-queue-runner/pool.hh | 2 +- src/lib/Hydra/Helper/Nix.pm | 2 + src/script/hydra-evaluator | 2 + 4 files changed, 48 insertions(+), 14 deletions(-) diff --git a/src/hydra-queue-runner/hydra-queue-runner.cc b/src/hydra-queue-runner/hydra-queue-runner.cc index 9e5553b6..4e0b9c53 100644 --- a/src/hydra-queue-runner/hydra-queue-runner.cc +++ b/src/hydra-queue-runner/hydra-queue-runner.cc @@ -156,10 +156,6 @@ private: Path hydraData, logDir; - /* CV for waking up the queue. */ - std::condition_variable queueMonitorWakeup; - std::mutex queueMonitorMutex; - /* The queued builds. */ typedef std::map Builds; Sync builds; @@ -214,7 +210,7 @@ public: void queueMonitor(); - void getQueuedBuilds(std::shared_ptr store); + void getQueuedBuilds(std::shared_ptr store, unsigned int & lastBuildId); Step::ptr createStep(std::shared_ptr store, const Path & drvPath, std::set & newRunnable); @@ -366,14 +362,47 @@ void State::finishBuildStep(pqxx::work & txn, time_t startTime, time_t stopTime, void State::queueMonitor() { + auto conn(dbPool.get()); + + struct receiver : public pqxx::notification_receiver + { + bool status = false; + receiver(pqxx::connection_base & c, const std::string & channel) + : pqxx::notification_receiver(c, channel) { } + void operator() (const string & payload, int pid) override + { + status = true; + }; + bool get() { + bool b = status; + status = false; + return b; + } + }; + + receiver buildsAdded(*conn, "builds_added"); + receiver buildsRestarted(*conn, "builds_restarted"); + receiver buildsCancelled(*conn, "builds_cancelled"); + auto store = openStore(); // FIXME: pool - while (true) { - getQueuedBuilds(store); + unsigned int lastBuildId = 0; - { - std::unique_lock lock(queueMonitorMutex); - queueMonitorWakeup.wait_for(lock, std::chrono::seconds(5)); + while (true) { + getQueuedBuilds(store, lastBuildId); + + /* Sleep until we get notification from the database about an + event. */ + conn->await_notification(); + + if (buildsAdded.get()) + printMsg(lvlError, "got notification: new builds added to the queue"); + if (buildsRestarted.get()) { + printMsg(lvlError, "got notification: builds restarted"); + lastBuildId = 0; // check all builds + } + if (buildsCancelled.get()) { + printMsg(lvlError, "got notification: builds cancelled"); } } @@ -381,9 +410,9 @@ void State::queueMonitor() } -void State::getQueuedBuilds(std::shared_ptr store) +void State::getQueuedBuilds(std::shared_ptr store, unsigned int & lastBuildId) { - printMsg(lvlError, "checking the queue..."); + printMsg(lvlError, format("checking the queue for builds > %1%...") % lastBuildId); auto conn(dbPool.get()); @@ -396,11 +425,12 @@ void State::getQueuedBuilds(std::shared_ptr store) // FIXME: query only builds with ID higher than the previous // highest. - auto res = txn.exec("select * from Builds where finished = 0 order by id"); + auto res = txn.parameterized("select * from Builds where id > $1 and finished = 0 order by id")(lastBuildId).exec(); for (auto const & row : res) { auto builds_(builds.lock()); BuildID id = row["id"].as(); + if (id > lastBuildId) lastBuildId = id; if (has(*builds_, id)) continue; auto build = std::make_shared(); diff --git a/src/hydra-queue-runner/pool.hh b/src/hydra-queue-runner/pool.hh index 0a58ebe0..a1cd3977 100644 --- a/src/hydra-queue-runner/pool.hh +++ b/src/hydra-queue-runner/pool.hh @@ -57,7 +57,7 @@ public: if (r) state_->idle.push_back(r); } - R * operator -> () { return r; } + R * operator -> () { return r.get(); } R & operator * () { return *r; } }; diff --git a/src/lib/Hydra/Helper/Nix.pm b/src/lib/Hydra/Helper/Nix.pm index 7a68490f..9523ddaa 100644 --- a/src/lib/Hydra/Helper/Nix.pm +++ b/src/lib/Hydra/Helper/Nix.pm @@ -469,6 +469,8 @@ sub restartBuilds($$) { # FIXME: Add this to the API. # FIXME: clear the dependencies? $db->resultset('FailedPaths')->search({ path => [ @paths ]})->delete; + + $db->storage->dbh->do("notify builds_restarted"); }); return scalar(@buildIds); diff --git a/src/script/hydra-evaluator b/src/script/hydra-evaluator index 9d463122..bcdb948e 100755 --- a/src/script/hydra-evaluator +++ b/src/script/hydra-evaluator @@ -246,6 +246,8 @@ sub checkJobsetWrapped { $jobset->update({ enabled => 0 }) if $jobset->enabled == 2; $jobset->update({ lastcheckedtime => time }); + + $db->storage->dbh->do("notify builds_added"); }); # Store the error messages for jobs that failed to evaluate.