diff --git a/src/hydra-queue-runner/queue-monitor.cc b/src/hydra-queue-runner/queue-monitor.cc index c28d7569..3be08897 100644 --- a/src/hydra-queue-runner/queue-monitor.cc +++ b/src/hydra-queue-runner/queue-monitor.cc @@ -26,6 +26,7 @@ void State::queueMonitorLoop() receiver buildsCancelled(*conn, "builds_cancelled"); receiver buildsDeleted(*conn, "builds_deleted"); receiver buildsBumped(*conn, "builds_bumped"); + receiver jobsetSharesChanged(*conn, "jobset_shares_changed"); auto store = openStore(); // FIXME: pool @@ -49,6 +50,10 @@ void State::queueMonitorLoop() printMsg(lvlTalkative, "got notification: builds cancelled or bumped"); processQueueChange(*conn); } + if (jobsetSharesChanged.get()) { + printMsg(lvlTalkative, "got notification: jobset shares changed"); + processJobsetSharesChange(*conn); + } } } @@ -410,12 +415,13 @@ Step::ptr State::createStep(std::shared_ptr store, const Path & drvPat Jobset::ptr State::createJobset(pqxx::work & txn, const std::string & projectName, const std::string & jobsetName) { - auto jobsets_(jobsets.lock()); - auto p = std::make_pair(projectName, jobsetName); - auto i = jobsets_->find(p); - if (i != jobsets_->end()) return i->second; + { + auto jobsets_(jobsets.lock()); + auto i = jobsets_->find(p); + if (i != jobsets_->end()) return i->second; + } auto res = txn.parameterized ("select schedulingShares from Jobsets where project = $1 and name = $2") @@ -423,9 +429,9 @@ Jobset::ptr State::createJobset(pqxx::work & txn, if (res.empty()) throw Error("missing jobset - can't happen"); auto shares = res[0]["schedulingShares"].as(); - if (shares == 0) shares = 1; - auto jobset = std::make_shared(shares); + auto jobset = std::make_shared(); + jobset->setShares(shares); /* Load the build steps from the last 24 hours. */ res = txn.parameterized @@ -438,6 +444,23 @@ Jobset::ptr State::createJobset(pqxx::work & txn, jobset->addStep(startTime, stopTime - startTime); } + auto jobsets_(jobsets.lock()); + // Can't happen because only this thread adds to "jobsets". + assert(jobsets_->find(p) == jobsets_->end()); (*jobsets_)[p] = jobset; return jobset; } + + +void State::processJobsetSharesChange(Connection & conn) +{ + /* Get the current set of jobsets. */ + pqxx::work txn(conn); + auto res = txn.exec("select project, name, schedulingShares from Jobsets"); + for (auto const & row : res) { + auto jobsets_(jobsets.lock()); + auto i = jobsets_->find(std::make_pair(row["project"].as(), row["name"].as())); + if (i == jobsets_->end()) continue; + i->second->setShares(row["schedulingShares"].as()); + } +} diff --git a/src/hydra-queue-runner/state.hh b/src/hydra-queue-runner/state.hh index 6489294e..060e2d1c 100644 --- a/src/hydra-queue-runner/state.hh +++ b/src/hydra-queue-runner/state.hh @@ -67,14 +67,12 @@ public: typedef std::shared_ptr ptr; typedef std::weak_ptr wptr; - Jobset(unsigned int shares) : shares(shares) { } - static const time_t schedulingWindow = 24 * 60 * 60; private: std::atomic seconds{0}; - std::atomic shares; + std::atomic shares{1}; /* The start time and duration of the most recent build steps. */ Sync> steps; @@ -86,6 +84,12 @@ public: return (double) seconds / shares; } + void setShares(int shares_) + { + assert(shares_ > 0); + shares = shares_; + } + time_t getSeconds() { return seconds; } void addStep(time_t startTime, time_t duration); @@ -354,6 +358,8 @@ private: Jobset::ptr createJobset(pqxx::work & txn, const std::string & projectName, const std::string & jobsetName); + void processJobsetSharesChange(Connection & conn); + void makeRunnable(Step::ptr step); /* The thread that selects and starts runnable builds. */ diff --git a/src/sql/hydra.sql b/src/sql/hydra.sql index ae66f2ee..b6710af7 100644 --- a/src/sql/hydra.sql +++ b/src/sql/hydra.sql @@ -64,6 +64,7 @@ create table Jobsets ( checkInterval integer not null default 300, -- minimum time in seconds between polls (0 = disable polling) schedulingShares integer not null default 100, fetchErrorMsg text, + check schedulingShares > 0, primary key (project, name), foreign key (project) references Projects(name) on delete cascade on update cascade #ifdef SQLITE @@ -72,6 +73,14 @@ create table Jobsets ( #endif ); +#ifdef POSTGRESQL + +create function notifyJobsetSharesChanged() returns trigger as 'begin notify jobset_shares_changed; return null; end;' language plpgsql; +create trigger JobsetSharesChanged after update on Jobsets for each row + when (old.schedulingShares != new.schedulingShares) execute procedure notifyJobsetSharesChanged(); + +#endif + create table JobsetRenames ( project text not null, diff --git a/src/sql/upgrade-41.sql b/src/sql/upgrade-41.sql new file mode 100644 index 00000000..509a5351 --- /dev/null +++ b/src/sql/upgrade-41.sql @@ -0,0 +1,5 @@ +create function notifyJobsetSharesChanged() returns trigger as 'begin notify jobset_shares_changed; return null; end;' language plpgsql; +create trigger JobsetSharesChanged after update on Jobsets for each row + when (old.schedulingShares != new.schedulingShares) execute procedure notifyJobsetSharesChanged(); + +alter table Jobsets add constraint jobsets_check check (schedulingShares > 0);