diff --git a/src/hydra-queue-runner/queue-monitor.cc b/src/hydra-queue-runner/queue-monitor.cc index 0694facc..3f3d55f2 100644 --- a/src/hydra-queue-runner/queue-monitor.cc +++ b/src/hydra-queue-runner/queue-monitor.cc @@ -36,12 +36,15 @@ void State::queueMonitorLoop() unsigned int lastBuildId = 0; while (true) { - getQueuedBuilds(*conn, store, lastBuildId); + bool done = getQueuedBuilds(*conn, store, lastBuildId); /* Sleep until we get notification from the database about an event. */ - conn->await_notification(); - nrQueueWakeups++; + if (done) { + conn->await_notification(); + nrQueueWakeups++; + } else + conn->get_notifs(); if (buildsAdded.get()) printMsg(lvlTalkative, "got notification: new builds added to the queue"); @@ -61,7 +64,7 @@ void State::queueMonitorLoop() } -void State::getQueuedBuilds(Connection & conn, std::shared_ptr store, unsigned int & lastBuildId) +bool State::getQueuedBuilds(Connection & conn, std::shared_ptr store, unsigned int & lastBuildId) { printMsg(lvlInfo, format("checking the queue for builds > %1%...") % lastBuildId); @@ -71,6 +74,8 @@ void State::getQueuedBuilds(Connection & conn, std::shared_ptr store, std::map newBuildsByID; std::multimap newBuildsByPath; + unsigned int newLastBuildId = lastBuildId; + { pqxx::work txn(conn); @@ -83,7 +88,7 @@ void State::getQueuedBuilds(Connection & conn, std::shared_ptr store, auto builds_(builds.lock()); BuildID id = row["id"].as(); if (buildOne && id != buildOne) continue; - if (id > lastBuildId) lastBuildId = id; + if (id > newLastBuildId) newLastBuildId = id; if (has(*builds_, id)) continue; auto build = std::make_shared(); @@ -229,6 +234,8 @@ void State::getQueuedBuilds(Connection & conn, std::shared_ptr store, /* Now instantiate build steps for each new build. The builder threads can start building the runnable build steps right away, even while we're still processing other new builds. */ + system_time start = std::chrono::system_clock::now(); + for (auto id : newIDs) { auto i = newBuildsByID.find(id); if (i == newBuildsByID.end()) continue; @@ -250,7 +257,14 @@ void State::getQueuedBuilds(Connection & conn, std::shared_ptr store, makeRunnable(r); nrBuildsRead += nrAdded; + + /* Stop after a certain time to allow priority bumps to be + processed. */ + if (std::chrono::system_clock::now() > start + std::chrono::seconds(600)) break; } + + lastBuildId = newBuildsByID.empty() ? newLastBuildId : newBuildsByID.begin()->first - 1; + return newBuildsByID.empty(); } diff --git a/src/hydra-queue-runner/state.hh b/src/hydra-queue-runner/state.hh index fda04192..1a988076 100644 --- a/src/hydra-queue-runner/state.hh +++ b/src/hydra-queue-runner/state.hh @@ -377,7 +377,7 @@ private: void queueMonitorLoop(); /* Check the queue for new builds. */ - void getQueuedBuilds(Connection & conn, std::shared_ptr store, unsigned int & lastBuildId); + bool getQueuedBuilds(Connection & conn, std::shared_ptr store, unsigned int & lastBuildId); /* Handle cancellation, deletion and priority bumps. */ void processQueueChange(Connection & conn);