getQueuedBuilds(): Periodically stop to handle priority bumps

Previously, priority bumps could take a long time to get noticed if
getQueuedBuilds() was busy processing zillions of queue
additions. (This was made worse by the reintroduction of substitute
checking.)
This commit is contained in:
Eelco Dolstra 2015-10-22 17:00:46 +02:00
parent 71bf7e02d5
commit 53c80d9526
2 changed files with 20 additions and 6 deletions

View file

@ -36,12 +36,15 @@ void State::queueMonitorLoop()
unsigned int lastBuildId = 0; unsigned int lastBuildId = 0;
while (true) { while (true) {
getQueuedBuilds(*conn, store, lastBuildId); bool done = getQueuedBuilds(*conn, store, lastBuildId);
/* Sleep until we get notification from the database about an /* Sleep until we get notification from the database about an
event. */ event. */
conn->await_notification(); if (done) {
nrQueueWakeups++; conn->await_notification();
nrQueueWakeups++;
} else
conn->get_notifs();
if (buildsAdded.get()) if (buildsAdded.get())
printMsg(lvlTalkative, "got notification: new builds added to the queue"); printMsg(lvlTalkative, "got notification: new builds added to the queue");
@ -61,7 +64,7 @@ void State::queueMonitorLoop()
} }
void State::getQueuedBuilds(Connection & conn, std::shared_ptr<StoreAPI> store, unsigned int & lastBuildId) bool State::getQueuedBuilds(Connection & conn, std::shared_ptr<StoreAPI> store, unsigned int & lastBuildId)
{ {
printMsg(lvlInfo, format("checking the queue for builds > %1%...") % lastBuildId); printMsg(lvlInfo, format("checking the queue for builds > %1%...") % lastBuildId);
@ -71,6 +74,8 @@ void State::getQueuedBuilds(Connection & conn, std::shared_ptr<StoreAPI> store,
std::map<BuildID, Build::ptr> newBuildsByID; std::map<BuildID, Build::ptr> newBuildsByID;
std::multimap<Path, BuildID> newBuildsByPath; std::multimap<Path, BuildID> newBuildsByPath;
unsigned int newLastBuildId = lastBuildId;
{ {
pqxx::work txn(conn); pqxx::work txn(conn);
@ -83,7 +88,7 @@ void State::getQueuedBuilds(Connection & conn, std::shared_ptr<StoreAPI> store,
auto builds_(builds.lock()); auto builds_(builds.lock());
BuildID id = row["id"].as<BuildID>(); BuildID id = row["id"].as<BuildID>();
if (buildOne && id != buildOne) continue; if (buildOne && id != buildOne) continue;
if (id > lastBuildId) lastBuildId = id; if (id > newLastBuildId) newLastBuildId = id;
if (has(*builds_, id)) continue; if (has(*builds_, id)) continue;
auto build = std::make_shared<Build>(); auto build = std::make_shared<Build>();
@ -229,6 +234,8 @@ void State::getQueuedBuilds(Connection & conn, std::shared_ptr<StoreAPI> store,
/* Now instantiate build steps for each new build. The builder /* Now instantiate build steps for each new build. The builder
threads can start building the runnable build steps right away, threads can start building the runnable build steps right away,
even while we're still processing other new builds. */ even while we're still processing other new builds. */
system_time start = std::chrono::system_clock::now();
for (auto id : newIDs) { for (auto id : newIDs) {
auto i = newBuildsByID.find(id); auto i = newBuildsByID.find(id);
if (i == newBuildsByID.end()) continue; if (i == newBuildsByID.end()) continue;
@ -250,7 +257,14 @@ void State::getQueuedBuilds(Connection & conn, std::shared_ptr<StoreAPI> store,
makeRunnable(r); makeRunnable(r);
nrBuildsRead += nrAdded; nrBuildsRead += nrAdded;
/* Stop after a certain time to allow priority bumps to be
processed. */
if (std::chrono::system_clock::now() > start + std::chrono::seconds(600)) break;
} }
lastBuildId = newBuildsByID.empty() ? newLastBuildId : newBuildsByID.begin()->first - 1;
return newBuildsByID.empty();
} }

View file

@ -377,7 +377,7 @@ private:
void queueMonitorLoop(); void queueMonitorLoop();
/* Check the queue for new builds. */ /* Check the queue for new builds. */
void getQueuedBuilds(Connection & conn, std::shared_ptr<nix::StoreAPI> store, unsigned int & lastBuildId); bool getQueuedBuilds(Connection & conn, std::shared_ptr<nix::StoreAPI> store, unsigned int & lastBuildId);
/* Handle cancellation, deletion and priority bumps. */ /* Handle cancellation, deletion and priority bumps. */
void processQueueChange(Connection & conn); void processQueueChange(Connection & conn);