hydra-evaluator improvements

* The "Jobset" page now shows when evaluations are in progress (rather
  than just pending).

* Restored the ability to do a single evaluation from the command line
  by doing "hydra-evaluator <project> <jobset>".

* Fix some consistency issues between jobset status in PostgreSQL and
  in hydra-evaluator. In particular, "lastCheckedTime" was never
  updated internally.
This commit is contained in:
Eelco Dolstra 2017-03-13 16:19:22 +01:00
parent 242ea8009f
commit 285754aff6
No known key found for this signature in database
GPG key ID: 8170B4726D7198DE
8 changed files with 127 additions and 58 deletions

View file

@ -345,6 +345,7 @@ in
environment = env; environment = env;
serviceConfig = serviceConfig =
{ ExecStart = "@${cfg.package}/bin/hydra-evaluator hydra-evaluator"; { ExecStart = "@${cfg.package}/bin/hydra-evaluator hydra-evaluator";
ExecStopPost = "${cfg.package}/bin/hydra-evaluator --unlock";
User = "hydra"; User = "hydra";
Restart = "always"; Restart = "always";
WorkingDirectory = baseDir; WorkingDirectory = baseDir;

View file

@ -5,18 +5,19 @@
#include <algorithm> #include <algorithm>
#include <thread> #include <thread>
#include <cstring> #include <cstring>
#include <experimental/optional>
#include <sys/types.h> #include <sys/types.h>
#include <sys/wait.h> #include <sys/wait.h>
using namespace nix; using namespace nix;
typedef std::pair<std::string, std::string> JobsetName;
struct Evaluator struct Evaluator
{ {
nix::Pool<Connection> dbPool; nix::Pool<Connection> dbPool;
typedef std::pair<std::string, std::string> JobsetName;
struct Jobset struct Jobset
{ {
JobsetName name; JobsetName name;
@ -27,6 +28,8 @@ struct Evaluator
typedef std::map<JobsetName, Jobset> Jobsets; typedef std::map<JobsetName, Jobset> Jobsets;
std::experimental::optional<JobsetName> evalOne;
size_t maxEvals = 4; size_t maxEvals = 4;
struct State struct State
@ -59,6 +62,8 @@ struct Evaluator
for (auto const & row : res) { for (auto const & row : res) {
auto name = JobsetName{row["project"].as<std::string>(), row["name"].as<std::string>()}; auto name = JobsetName{row["project"].as<std::string>(), row["name"].as<std::string>()};
if (evalOne && name != *evalOne) continue;
auto res = state->jobsets.try_emplace(name, Jobset{name}); auto res = state->jobsets.try_emplace(name, Jobset{name});
auto & jobset = res.first->second; auto & jobset = res.first->second;
@ -69,6 +74,11 @@ struct Evaluator
seen.insert(name); seen.insert(name);
} }
if (evalOne && seen.empty()) {
printError("the specified jobset does not exist");
std::_Exit(1);
}
for (auto i = state->jobsets.begin(); i != state->jobsets.end(); ) for (auto i = state->jobsets.begin(); i != state->jobsets.end(); )
if (seen.count(i->first)) if (seen.count(i->first))
++i; ++i;
@ -80,7 +90,23 @@ struct Evaluator
void startEval(State & state, Jobset & jobset) void startEval(State & state, Jobset & jobset)
{ {
printInfo("starting evaluation of jobset %s:%s", jobset.name.first, jobset.name.second); time_t now = time(0);
printInfo("starting evaluation of jobset %s:%s (last checked %d s ago)",
jobset.name.first, jobset.name.second,
now - jobset.lastCheckedTime);
{
auto conn(dbPool.get());
pqxx::work txn(*conn);
txn.parameterized
("update Jobsets set startTime = $1 where project = $2 and name = $3")
(now)
(jobset.name.first)
(jobset.name.second)
.exec();
txn.commit();
}
assert(jobset.pid == -1); assert(jobset.pid == -1);
@ -93,23 +119,6 @@ struct Evaluator
state.runningEvals++; state.runningEvals++;
childStarted.notify_one(); childStarted.notify_one();
time_t now = time(0);
{
auto conn(dbPool.get());
pqxx::work txn(*conn);
txn.parameterized
("update Jobsets set lastCheckedTime = $1, triggerTime = null where project = $2 and name = $3")
(now)
(jobset.name.first)
(jobset.name.second)
.exec();
txn.commit();
jobset.lastCheckedTime = now;
jobset.triggerTime = notTriggered;
}
} }
void startEvals(State & state) void startEvals(State & state)
@ -121,9 +130,10 @@ struct Evaluator
/* Filter out jobsets that have been evaluated recently and have /* Filter out jobsets that have been evaluated recently and have
not been triggered. */ not been triggered. */
for (auto i = state.jobsets.begin(); i != state.jobsets.end(); ++i) for (auto i = state.jobsets.begin(); i != state.jobsets.end(); ++i)
if (i->second.pid == -1 && if (evalOne ||
(i->second.triggerTime != std::numeric_limits<time_t>::max() || (i->second.pid == -1 &&
(i->second.checkInterval > 0 && i->second.lastCheckedTime + i->second.checkInterval <= now))) (i->second.triggerTime != std::numeric_limits<time_t>::max() ||
(i->second.checkInterval > 0 && i->second.lastCheckedTime + i->second.checkInterval <= now))))
sorted.push_back(i); sorted.push_back(i);
/* Put jobsets in order of ascending trigger time, last checked /* Put jobsets in order of ascending trigger time, last checked
@ -226,40 +236,83 @@ struct Evaluator
auto state(state_.lock()); auto state(state_.lock());
assert(state->runningEvals); assert(state->runningEvals);
state->runningEvals--; state->runningEvals--;
for (auto & jobset : state->jobsets)
if (jobset.second.pid == pid) { // FIXME: should use a map.
for (auto & i : state->jobsets) {
auto & jobset(i.second);
if (jobset.pid == pid) {
printInfo("evaluation of jobset %s:%s %s", printInfo("evaluation of jobset %s:%s %s",
jobset.first.first, jobset.first.second, statusToString(status)); jobset.name.first, jobset.name.second, statusToString(status));
auto now = time(0);
jobset.triggerTime = notTriggered;
jobset.lastCheckedTime = now;
try { try {
auto conn(dbPool.get());
pqxx::work txn(*conn);
/* Clear the trigger time to prevent this
jobset from getting stuck in an endless
failing eval loop. */
txn.parameterized
("update Jobsets set triggerTime = null where project = $1 and name = $2 and startTime is not null and triggerTime < startTime")
(jobset.name.first)
(jobset.name.second)
.exec();
/* Clear the start time. */
txn.parameterized
("update Jobsets set startTime = null where project = $1 and name = $2")
(jobset.name.first)
(jobset.name.second)
.exec();
if (!WIFEXITED(status) || WEXITSTATUS(status) > 1) { if (!WIFEXITED(status) || WEXITSTATUS(status) > 1) {
auto conn(dbPool.get());
pqxx::work txn(*conn);
txn.parameterized txn.parameterized
("update Jobsets set errorMsg = $1, errorTime = $2") ("update Jobsets set errorMsg = $1, lastCheckedTime = $2, errorTime = $2, fetchErrorMsg = null where project = $3 and name = $4")
(fmt("evaluation %s", statusToString(status))) (fmt("evaluation %s", statusToString(status)))
(time(0)) (now)
(jobset.name.first)
(jobset.name.second)
.exec(); .exec();
txn.commit();
} }
txn.commit();
} catch (std::exception & e) { } catch (std::exception & e) {
printError("exception setting jobset error: %s", e.what()); printError("exception setting jobset error: %s", e.what());
} }
jobset.second.pid.release(); jobset.pid.release();
maybeDoWork.notify_one(); maybeDoWork.notify_one();
if (evalOne) std::_Exit(0);
break; break;
} }
}
} }
} }
} }
void unlock()
{
auto conn(dbPool.get());
pqxx::work txn(*conn);
txn.parameterized("update Jobsets set startTime = null").exec();
txn.commit();
}
void run() void run()
{ {
unlock();
/* Can't be bothered to shut down cleanly. Goodbye! */ /* Can't be bothered to shut down cleanly. Goodbye! */
auto callback = createInterruptCallback([&]() { std::_Exit(0); }); auto callback = createInterruptCallback([&]() { std::_Exit(1); });
std::thread reaperThread([&]() { reaper(); }); std::thread reaperThread([&]() { reaper(); });
@ -285,10 +338,29 @@ int main(int argc, char * * argv)
signal(SIGTERM, SIG_DFL); signal(SIGTERM, SIG_DFL);
signal(SIGHUP, SIG_DFL); signal(SIGHUP, SIG_DFL);
bool unlock = false;
Evaluator evaluator;
std::vector<std::string> args;
parseCmdLine(argc, argv, [&](Strings::iterator & arg, const Strings::iterator & end) { parseCmdLine(argc, argv, [&](Strings::iterator & arg, const Strings::iterator & end) {
return false; if (*arg == "--unlock")
unlock = true;
else if (hasPrefix(*arg, "-"))
return false;
args.push_back(*arg);
return true;
}); });
Evaluator().run(); if (!args.empty()) {
if (args.size() != 2) throw UsageError("Syntax: hydra-evaluator [<project> <jobset>]");
evaluator.evalOne = JobsetName(args[0], args[1]);
}
if (unlock)
evaluator.unlock();
else
evaluator.run();
}); });
} }

View file

@ -43,7 +43,7 @@ sub jobset_GET {
$c->stash->{evals} = getEvals($self, $c, scalar $c->stash->{jobset}->jobsetevals, 0, 10); $c->stash->{evals} = getEvals($self, $c, scalar $c->stash->{jobset}->jobsetevals, 0, 10);
$c->stash->{latestEval} = $c->stash->{jobset}->jobsetevals->search({}, { rows => 1, order_by => ["id desc"] })->single; $c->stash->{latestEval} = $c->stash->{jobset}->jobsetevals->search({ hasnewbuilds => 1 }, { rows => 1, order_by => ["id desc"] })->single;
$c->stash->{totalShares} = getTotalShares($c->model('DB')->schema); $c->stash->{totalShares} = getTotalShares($c->model('DB')->schema);

View file

@ -134,6 +134,11 @@ __PACKAGE__->table("Jobsets");
data_type: 'boolean' data_type: 'boolean'
is_nullable: 1 is_nullable: 1
=head2 starttime
data_type: 'integer'
is_nullable: 1
=cut =cut
__PACKAGE__->add_columns( __PACKAGE__->add_columns(
@ -173,6 +178,8 @@ __PACKAGE__->add_columns(
{ data_type => "text", is_nullable => 1 }, { data_type => "text", is_nullable => 1 },
"forceeval", "forceeval",
{ data_type => "boolean", is_nullable => 1 }, { data_type => "boolean", is_nullable => 1 },
"starttime",
{ data_type => "integer", is_nullable => 1 },
); );
=head1 PRIMARY KEY =head1 PRIMARY KEY
@ -345,8 +352,8 @@ __PACKAGE__->has_many(
); );
# Created by DBIx::Class::Schema::Loader v0.07045 @ 2016-10-24 20:12:51 # Created by DBIx::Class::Schema::Loader v0.07045 @ 2017-03-09 13:03:05
# DO NOT MODIFY THIS OR ANYTHING ABOVE! md5sum:PSR66NnVRNTMFhDEm10erA # DO NOT MODIFY THIS OR ANYTHING ABOVE! md5sum:ivYvsUyhEeaeI4EmRQ0/QQ
my %hint = ( my %hint = (
columns => [ columns => [

View file

@ -94,10 +94,15 @@
[% END %] [% END %]
</td> </td>
</tr> </tr>
[% IF jobset.triggertime %] [% IF jobset.starttime %]
<tr>
<th>Evaluation running since:</th>
<td>[% INCLUDE renderRelativeDate timestamp = jobset.starttime %]</td>
</tr>
[% ELSIF jobset.triggertime %]
<tr> <tr>
<th>Evaluation pending since:</th> <th>Evaluation pending since:</th>
<td>[% INCLUDE renderDateTime timestamp = jobset.triggertime %]</td> <td>[% INCLUDE renderRelativeDate timestamp = jobset.triggertime %]</td>
</tr> </tr>
[% END %] [% END %]
</table> </table>

View file

@ -749,13 +749,6 @@ sub checkJobsetWrapped {
sub checkJobset { sub checkJobset {
my ($jobset) = @_; my ($jobset) = @_;
print STDERR "considering jobset ", $jobset->project->name, ":", $jobset->name,
$jobset->lastcheckedtime
? " (last checked " . (time() - $jobset->lastcheckedtime) . "s ago)\n"
: " (never checked)\n";
my $triggerTime = $jobset->triggertime;
my $startTime = clock_gettime(CLOCK_MONOTONIC); my $startTime = clock_gettime(CLOCK_MONOTONIC);
eval { eval {
@ -776,17 +769,6 @@ sub checkJobset {
$failed = 1; $failed = 1;
} }
if (defined $triggerTime) {
txn_do($db, sub {
# Only clear the trigger time if the jobset hasn't been
# triggered in the meantime. In that case, we need to
# evaluate again.
my $new = $jobset->get_from_storage();
$jobset->update({ triggertime => undef })
if $new->triggertime == $triggerTime;
}) if !$dryRun;
}
return $failed; return $failed;
} }

View file

@ -69,6 +69,7 @@ create table Jobsets (
schedulingShares integer not null default 100, schedulingShares integer not null default 100,
fetchErrorMsg text, fetchErrorMsg text,
forceEval boolean, forceEval boolean,
startTime integer, -- if jobset is currently running
check (schedulingShares > 0), check (schedulingShares > 0),
primary key (project, name), primary key (project, name),
foreign key (project) references Projects(name) on delete cascade on update cascade foreign key (project) references Projects(name) on delete cascade on update cascade

1
src/sql/upgrade-53.sql Normal file
View file

@ -0,0 +1 @@
alter table Jobsets add column startTime integer;