hydra-queue-runner: Detect changes to the scheduling shares
This commit is contained in:
parent
2e3899ed27
commit
d4759c1da2
4 changed files with 52 additions and 9 deletions
|
@ -26,6 +26,7 @@ void State::queueMonitorLoop()
|
||||||
receiver buildsCancelled(*conn, "builds_cancelled");
|
receiver buildsCancelled(*conn, "builds_cancelled");
|
||||||
receiver buildsDeleted(*conn, "builds_deleted");
|
receiver buildsDeleted(*conn, "builds_deleted");
|
||||||
receiver buildsBumped(*conn, "builds_bumped");
|
receiver buildsBumped(*conn, "builds_bumped");
|
||||||
|
receiver jobsetSharesChanged(*conn, "jobset_shares_changed");
|
||||||
|
|
||||||
auto store = openStore(); // FIXME: pool
|
auto store = openStore(); // FIXME: pool
|
||||||
|
|
||||||
|
@ -49,6 +50,10 @@ void State::queueMonitorLoop()
|
||||||
printMsg(lvlTalkative, "got notification: builds cancelled or bumped");
|
printMsg(lvlTalkative, "got notification: builds cancelled or bumped");
|
||||||
processQueueChange(*conn);
|
processQueueChange(*conn);
|
||||||
}
|
}
|
||||||
|
if (jobsetSharesChanged.get()) {
|
||||||
|
printMsg(lvlTalkative, "got notification: jobset shares changed");
|
||||||
|
processJobsetSharesChange(*conn);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -410,12 +415,13 @@ Step::ptr State::createStep(std::shared_ptr<StoreAPI> store, const Path & drvPat
|
||||||
Jobset::ptr State::createJobset(pqxx::work & txn,
|
Jobset::ptr State::createJobset(pqxx::work & txn,
|
||||||
const std::string & projectName, const std::string & jobsetName)
|
const std::string & projectName, const std::string & jobsetName)
|
||||||
{
|
{
|
||||||
auto jobsets_(jobsets.lock());
|
|
||||||
|
|
||||||
auto p = std::make_pair(projectName, jobsetName);
|
auto p = std::make_pair(projectName, jobsetName);
|
||||||
|
|
||||||
|
{
|
||||||
|
auto jobsets_(jobsets.lock());
|
||||||
auto i = jobsets_->find(p);
|
auto i = jobsets_->find(p);
|
||||||
if (i != jobsets_->end()) return i->second;
|
if (i != jobsets_->end()) return i->second;
|
||||||
|
}
|
||||||
|
|
||||||
auto res = txn.parameterized
|
auto res = txn.parameterized
|
||||||
("select schedulingShares from Jobsets where project = $1 and name = $2")
|
("select schedulingShares from Jobsets where project = $1 and name = $2")
|
||||||
|
@ -423,9 +429,9 @@ Jobset::ptr State::createJobset(pqxx::work & txn,
|
||||||
if (res.empty()) throw Error("missing jobset - can't happen");
|
if (res.empty()) throw Error("missing jobset - can't happen");
|
||||||
|
|
||||||
auto shares = res[0]["schedulingShares"].as<unsigned int>();
|
auto shares = res[0]["schedulingShares"].as<unsigned int>();
|
||||||
if (shares == 0) shares = 1;
|
|
||||||
|
|
||||||
auto jobset = std::make_shared<Jobset>(shares);
|
auto jobset = std::make_shared<Jobset>();
|
||||||
|
jobset->setShares(shares);
|
||||||
|
|
||||||
/* Load the build steps from the last 24 hours. */
|
/* Load the build steps from the last 24 hours. */
|
||||||
res = txn.parameterized
|
res = txn.parameterized
|
||||||
|
@ -438,6 +444,23 @@ Jobset::ptr State::createJobset(pqxx::work & txn,
|
||||||
jobset->addStep(startTime, stopTime - startTime);
|
jobset->addStep(startTime, stopTime - startTime);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
auto jobsets_(jobsets.lock());
|
||||||
|
// Can't happen because only this thread adds to "jobsets".
|
||||||
|
assert(jobsets_->find(p) == jobsets_->end());
|
||||||
(*jobsets_)[p] = jobset;
|
(*jobsets_)[p] = jobset;
|
||||||
return jobset;
|
return jobset;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
void State::processJobsetSharesChange(Connection & conn)
|
||||||
|
{
|
||||||
|
/* Get the current set of jobsets. */
|
||||||
|
pqxx::work txn(conn);
|
||||||
|
auto res = txn.exec("select project, name, schedulingShares from Jobsets");
|
||||||
|
for (auto const & row : res) {
|
||||||
|
auto jobsets_(jobsets.lock());
|
||||||
|
auto i = jobsets_->find(std::make_pair(row["project"].as<string>(), row["name"].as<string>()));
|
||||||
|
if (i == jobsets_->end()) continue;
|
||||||
|
i->second->setShares(row["schedulingShares"].as<unsigned int>());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
|
@ -67,14 +67,12 @@ public:
|
||||||
typedef std::shared_ptr<Jobset> ptr;
|
typedef std::shared_ptr<Jobset> ptr;
|
||||||
typedef std::weak_ptr<Jobset> wptr;
|
typedef std::weak_ptr<Jobset> wptr;
|
||||||
|
|
||||||
Jobset(unsigned int shares) : shares(shares) { }
|
|
||||||
|
|
||||||
static const time_t schedulingWindow = 24 * 60 * 60;
|
static const time_t schedulingWindow = 24 * 60 * 60;
|
||||||
|
|
||||||
private:
|
private:
|
||||||
|
|
||||||
std::atomic<time_t> seconds{0};
|
std::atomic<time_t> seconds{0};
|
||||||
std::atomic<unsigned int> shares;
|
std::atomic<unsigned int> shares{1};
|
||||||
|
|
||||||
/* The start time and duration of the most recent build steps. */
|
/* The start time and duration of the most recent build steps. */
|
||||||
Sync<std::map<time_t, time_t>> steps;
|
Sync<std::map<time_t, time_t>> steps;
|
||||||
|
@ -86,6 +84,12 @@ public:
|
||||||
return (double) seconds / shares;
|
return (double) seconds / shares;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void setShares(int shares_)
|
||||||
|
{
|
||||||
|
assert(shares_ > 0);
|
||||||
|
shares = shares_;
|
||||||
|
}
|
||||||
|
|
||||||
time_t getSeconds() { return seconds; }
|
time_t getSeconds() { return seconds; }
|
||||||
|
|
||||||
void addStep(time_t startTime, time_t duration);
|
void addStep(time_t startTime, time_t duration);
|
||||||
|
@ -354,6 +358,8 @@ private:
|
||||||
Jobset::ptr createJobset(pqxx::work & txn,
|
Jobset::ptr createJobset(pqxx::work & txn,
|
||||||
const std::string & projectName, const std::string & jobsetName);
|
const std::string & projectName, const std::string & jobsetName);
|
||||||
|
|
||||||
|
void processJobsetSharesChange(Connection & conn);
|
||||||
|
|
||||||
void makeRunnable(Step::ptr step);
|
void makeRunnable(Step::ptr step);
|
||||||
|
|
||||||
/* The thread that selects and starts runnable builds. */
|
/* The thread that selects and starts runnable builds. */
|
||||||
|
|
|
@ -64,6 +64,7 @@ create table Jobsets (
|
||||||
checkInterval integer not null default 300, -- minimum time in seconds between polls (0 = disable polling)
|
checkInterval integer not null default 300, -- minimum time in seconds between polls (0 = disable polling)
|
||||||
schedulingShares integer not null default 100,
|
schedulingShares integer not null default 100,
|
||||||
fetchErrorMsg text,
|
fetchErrorMsg text,
|
||||||
|
check schedulingShares > 0,
|
||||||
primary key (project, name),
|
primary key (project, name),
|
||||||
foreign key (project) references Projects(name) on delete cascade on update cascade
|
foreign key (project) references Projects(name) on delete cascade on update cascade
|
||||||
#ifdef SQLITE
|
#ifdef SQLITE
|
||||||
|
@ -72,6 +73,14 @@ create table Jobsets (
|
||||||
#endif
|
#endif
|
||||||
);
|
);
|
||||||
|
|
||||||
|
#ifdef POSTGRESQL
|
||||||
|
|
||||||
|
create function notifyJobsetSharesChanged() returns trigger as 'begin notify jobset_shares_changed; return null; end;' language plpgsql;
|
||||||
|
create trigger JobsetSharesChanged after update on Jobsets for each row
|
||||||
|
when (old.schedulingShares != new.schedulingShares) execute procedure notifyJobsetSharesChanged();
|
||||||
|
|
||||||
|
#endif
|
||||||
|
|
||||||
|
|
||||||
create table JobsetRenames (
|
create table JobsetRenames (
|
||||||
project text not null,
|
project text not null,
|
||||||
|
|
5
src/sql/upgrade-41.sql
Normal file
5
src/sql/upgrade-41.sql
Normal file
|
@ -0,0 +1,5 @@
|
||||||
|
create function notifyJobsetSharesChanged() returns trigger as 'begin notify jobset_shares_changed; return null; end;' language plpgsql;
|
||||||
|
create trigger JobsetSharesChanged after update on Jobsets for each row
|
||||||
|
when (old.schedulingShares != new.schedulingShares) execute procedure notifyJobsetSharesChanged();
|
||||||
|
|
||||||
|
alter table Jobsets add constraint jobsets_check check (schedulingShares > 0);
|
Loading…
Reference in a new issue