Use PostgreSQL notifications for queue events

Hydra-queue-runner now no longer polls the queue periodically, but
instead sleeps until it receives a notification from PostgreSQL about
a change to the queue (build added, build cancelled or build
restarted).

Also, for the "build added" case, we now only check for builds with an
ID greater than the previous greatest ID. This is much more efficient
if the queue is large.
This commit is contained in:
Eelco Dolstra 2015-06-11 17:38:55 +02:00
parent d72a88b562
commit c08883966c
4 changed files with 48 additions and 14 deletions

View file

@ -156,10 +156,6 @@ private:
Path hydraData, logDir; Path hydraData, logDir;
/* CV for waking up the queue. */
std::condition_variable queueMonitorWakeup;
std::mutex queueMonitorMutex;
/* The queued builds. */ /* The queued builds. */
typedef std::map<BuildID, Build::ptr> Builds; typedef std::map<BuildID, Build::ptr> Builds;
Sync<Builds> builds; Sync<Builds> builds;
@ -214,7 +210,7 @@ public:
void queueMonitor(); void queueMonitor();
void getQueuedBuilds(std::shared_ptr<StoreAPI> store); void getQueuedBuilds(std::shared_ptr<StoreAPI> store, unsigned int & lastBuildId);
Step::ptr createStep(std::shared_ptr<StoreAPI> store, const Path & drvPath, Step::ptr createStep(std::shared_ptr<StoreAPI> store, const Path & drvPath,
std::set<Step::ptr> & newRunnable); std::set<Step::ptr> & newRunnable);
@ -366,14 +362,47 @@ void State::finishBuildStep(pqxx::work & txn, time_t startTime, time_t stopTime,
void State::queueMonitor() void State::queueMonitor()
{ {
auto conn(dbPool.get());
struct receiver : public pqxx::notification_receiver
{
bool status = false;
receiver(pqxx::connection_base & c, const std::string & channel)
: pqxx::notification_receiver(c, channel) { }
void operator() (const string & payload, int pid) override
{
status = true;
};
bool get() {
bool b = status;
status = false;
return b;
}
};
receiver buildsAdded(*conn, "builds_added");
receiver buildsRestarted(*conn, "builds_restarted");
receiver buildsCancelled(*conn, "builds_cancelled");
auto store = openStore(); // FIXME: pool auto store = openStore(); // FIXME: pool
while (true) { unsigned int lastBuildId = 0;
getQueuedBuilds(store);
{ while (true) {
std::unique_lock<std::mutex> lock(queueMonitorMutex); getQueuedBuilds(store, lastBuildId);
queueMonitorWakeup.wait_for(lock, std::chrono::seconds(5));
/* Sleep until we get notification from the database about an
event. */
conn->await_notification();
if (buildsAdded.get())
printMsg(lvlError, "got notification: new builds added to the queue");
if (buildsRestarted.get()) {
printMsg(lvlError, "got notification: builds restarted");
lastBuildId = 0; // check all builds
}
if (buildsCancelled.get()) {
printMsg(lvlError, "got notification: builds cancelled");
} }
} }
@ -381,9 +410,9 @@ void State::queueMonitor()
} }
void State::getQueuedBuilds(std::shared_ptr<StoreAPI> store) void State::getQueuedBuilds(std::shared_ptr<StoreAPI> store, unsigned int & lastBuildId)
{ {
printMsg(lvlError, "checking the queue..."); printMsg(lvlError, format("checking the queue for builds > %1%...") % lastBuildId);
auto conn(dbPool.get()); auto conn(dbPool.get());
@ -396,11 +425,12 @@ void State::getQueuedBuilds(std::shared_ptr<StoreAPI> store)
// FIXME: query only builds with ID higher than the previous // FIXME: query only builds with ID higher than the previous
// highest. // highest.
auto res = txn.exec("select * from Builds where finished = 0 order by id"); auto res = txn.parameterized("select * from Builds where id > $1 and finished = 0 order by id")(lastBuildId).exec();
for (auto const & row : res) { for (auto const & row : res) {
auto builds_(builds.lock()); auto builds_(builds.lock());
BuildID id = row["id"].as<BuildID>(); BuildID id = row["id"].as<BuildID>();
if (id > lastBuildId) lastBuildId = id;
if (has(*builds_, id)) continue; if (has(*builds_, id)) continue;
auto build = std::make_shared<Build>(); auto build = std::make_shared<Build>();

View file

@ -57,7 +57,7 @@ public:
if (r) state_->idle.push_back(r); if (r) state_->idle.push_back(r);
} }
R * operator -> () { return r; } R * operator -> () { return r.get(); }
R & operator * () { return *r; } R & operator * () { return *r; }
}; };

View file

@ -469,6 +469,8 @@ sub restartBuilds($$) {
# FIXME: Add this to the API. # FIXME: Add this to the API.
# FIXME: clear the dependencies? # FIXME: clear the dependencies?
$db->resultset('FailedPaths')->search({ path => [ @paths ]})->delete; $db->resultset('FailedPaths')->search({ path => [ @paths ]})->delete;
$db->storage->dbh->do("notify builds_restarted");
}); });
return scalar(@buildIds); return scalar(@buildIds);

View file

@ -246,6 +246,8 @@ sub checkJobsetWrapped {
$jobset->update({ enabled => 0 }) if $jobset->enabled == 2; $jobset->update({ enabled => 0 }) if $jobset->enabled == 2;
$jobset->update({ lastcheckedtime => time }); $jobset->update({ lastcheckedtime => time });
$db->storage->dbh->do("notify builds_added");
}); });
# Store the error messages for jobs that failed to evaluate. # Store the error messages for jobs that failed to evaluate.