queue-runner: remove id > X from new builds query

Running the query with/without it shows that it makes no difference to
postgres, since there's an index on finished=0 already. This allows a
few simplifications, but also paves the way towards running multiple
parallel monitor threads in the future.
This commit is contained in:
Pierre Bourdon 2024-04-20 16:53:52 +02:00
parent cc6bafe538
commit 54f8daf6b1
Signed by untrusted user: delroth
GPG key ID: 6FB80DCD84DA0F1C
3 changed files with 5 additions and 27 deletions

View file

@ -70,13 +70,6 @@ State::PromMetrics::PromMetrics()
.Register(*registry) .Register(*registry)
.Add({}) .Add({})
) )
, queue_max_id(
prometheus::BuildGauge()
.Name("hydraqueuerunner_queue_max_build_id_info")
.Help("Maximum build record ID in the queue")
.Register(*registry)
.Add({})
)
, dispatcher_time_spent_running( , dispatcher_time_spent_running(
prometheus::BuildCounter() prometheus::BuildCounter()
.Name("hydraqueuerunner_dispatcher_time_spent_running") .Name("hydraqueuerunner_dispatcher_time_spent_running")

View file

@ -37,15 +37,13 @@ void State::queueMonitorLoop(Connection & conn)
auto destStore = getDestStore(); auto destStore = getDestStore();
unsigned int lastBuildId = 0;
bool quit = false; bool quit = false;
while (!quit) { while (!quit) {
auto t_before_work = std::chrono::steady_clock::now(); auto t_before_work = std::chrono::steady_clock::now();
localStore->clearPathInfoCache(); localStore->clearPathInfoCache();
bool done = getQueuedBuilds(conn, destStore, lastBuildId); bool done = getQueuedBuilds(conn, destStore);
if (buildOne && buildOneDone) quit = true; if (buildOne && buildOneDone) quit = true;
@ -63,12 +61,10 @@ void State::queueMonitorLoop(Connection & conn)
conn.get_notifs(); conn.get_notifs();
if (auto lowestId = buildsAdded.get()) { if (auto lowestId = buildsAdded.get()) {
lastBuildId = std::min(lastBuildId, static_cast<unsigned>(std::stoul(*lowestId) - 1));
printMsg(lvlTalkative, "got notification: new builds added to the queue"); printMsg(lvlTalkative, "got notification: new builds added to the queue");
} }
if (buildsRestarted.get()) { if (buildsRestarted.get()) {
printMsg(lvlTalkative, "got notification: builds restarted"); printMsg(lvlTalkative, "got notification: builds restarted");
lastBuildId = 0; // check all builds
} }
if (buildsCancelled.get() || buildsDeleted.get() || buildsBumped.get()) { if (buildsCancelled.get() || buildsDeleted.get() || buildsBumped.get()) {
printMsg(lvlTalkative, "got notification: builds cancelled or bumped"); printMsg(lvlTalkative, "got notification: builds cancelled or bumped");
@ -95,11 +91,11 @@ struct PreviousFailure : public std::exception {
bool State::getQueuedBuilds(Connection & conn, bool State::getQueuedBuilds(Connection & conn,
ref<Store> destStore, unsigned int & lastBuildId) ref<Store> destStore)
{ {
prom.queue_checks_started.Increment(); prom.queue_checks_started.Increment();
printInfo("checking the queue for builds > %d...", lastBuildId); printInfo("checking the queue for builds...");
/* Grab the queued builds from the database, but don't process /* Grab the queued builds from the database, but don't process
them yet (since we don't want a long-running transaction). */ them yet (since we don't want a long-running transaction). */
@ -107,8 +103,6 @@ bool State::getQueuedBuilds(Connection & conn,
std::map<BuildID, Build::ptr> newBuildsByID; std::map<BuildID, Build::ptr> newBuildsByID;
std::multimap<StorePath, BuildID> newBuildsByPath; std::multimap<StorePath, BuildID> newBuildsByPath;
unsigned int newLastBuildId = lastBuildId;
{ {
pqxx::work txn(conn); pqxx::work txn(conn);
@ -117,17 +111,12 @@ bool State::getQueuedBuilds(Connection & conn,
"jobsets.name as jobset, job, drvPath, maxsilent, timeout, timestamp, " "jobsets.name as jobset, job, drvPath, maxsilent, timeout, timestamp, "
"globalPriority, priority from Builds " "globalPriority, priority from Builds "
"inner join jobsets on builds.jobset_id = jobsets.id " "inner join jobsets on builds.jobset_id = jobsets.id "
"where builds.id > $1 and finished = 0 order by globalPriority desc, builds.id", "where finished = 0 order by globalPriority desc, builds.id");
lastBuildId);
for (auto const & row : res) { for (auto const & row : res) {
auto builds_(builds.lock()); auto builds_(builds.lock());
BuildID id = row["id"].as<BuildID>(); BuildID id = row["id"].as<BuildID>();
if (buildOne && id != buildOne) continue; if (buildOne && id != buildOne) continue;
if (id > newLastBuildId) {
newLastBuildId = id;
prom.queue_max_id.Set(id);
}
if (builds_->count(id)) continue; if (builds_->count(id)) continue;
auto build = std::make_shared<Build>( auto build = std::make_shared<Build>(
@ -336,8 +325,6 @@ bool State::getQueuedBuilds(Connection & conn,
} }
prom.queue_checks_finished.Increment(); prom.queue_checks_finished.Increment();
lastBuildId = newBuildsByID.empty() ? newLastBuildId : newBuildsByID.begin()->first - 1;
return newBuildsByID.empty(); return newBuildsByID.empty();
} }

View file

@ -490,7 +490,6 @@ private:
prometheus::Counter& queue_steps_created; prometheus::Counter& queue_steps_created;
prometheus::Counter& queue_checks_early_exits; prometheus::Counter& queue_checks_early_exits;
prometheus::Counter& queue_checks_finished; prometheus::Counter& queue_checks_finished;
prometheus::Gauge& queue_max_id;
prometheus::Counter& dispatcher_time_spent_running; prometheus::Counter& dispatcher_time_spent_running;
prometheus::Counter& dispatcher_time_spent_waiting; prometheus::Counter& dispatcher_time_spent_waiting;
@ -546,8 +545,7 @@ private:
void queueMonitorLoop(Connection & conn); void queueMonitorLoop(Connection & conn);
/* Check the queue for new builds. */ /* Check the queue for new builds. */
bool getQueuedBuilds(Connection & conn, bool getQueuedBuilds(Connection & conn, nix::ref<nix::Store> destStore);
nix::ref<nix::Store> destStore, unsigned int & lastBuildId);
/* Handle cancellation, deletion and priority bumps. */ /* Handle cancellation, deletion and priority bumps. */
void processQueueChange(Connection & conn); void processQueueChange(Connection & conn);