Concurrent hydra-evaluator

This rewrites the top-level loop of hydra-evaluator in C++. The Perl
stuff is moved into hydra-eval-jobset. (Rewriting the entire evaluator
would be nice but is a bit too much work.) The new version has some
advantages:

* It can run multiple jobset evaluations in parallel.

* It uses PostgreSQL notifications so it doesn't have to poll the
  database. So if a jobset is triggered via the web interface or from
  a GitHub / Bitbucket webhook, evaluation of the jobset will start
  almost instantaneously (assuming the evaluator is not at its
  concurrency limit).

* It imposes a timeout on evaluations. So if e.g. hydra-eval-jobset
  hangs connecting to a Mercurial server, it will eventually be
  killed.
This commit is contained in:
Eelco Dolstra 2016-10-13 15:53:05 +02:00
parent 16feddd5d4
commit e0b2921ff2
15 changed files with 325 additions and 55 deletions

View file

@ -11,7 +11,7 @@ AC_PROG_LN_S
AC_PROG_LIBTOOL AC_PROG_LIBTOOL
AC_PROG_CXX AC_PROG_CXX
CXXFLAGS+=" -std=c++11" CXXFLAGS+=" -std=c++17"
AC_PATH_PROG([XSLTPROC], [xsltproc]) AC_PATH_PROG([XSLTPROC], [xsltproc])
@ -70,6 +70,7 @@ AC_CONFIG_FILES([
doc/Makefile doc/Makefile
doc/manual/Makefile doc/manual/Makefile
src/Makefile src/Makefile
src/hydra-evaluator/Makefile
src/hydra-eval-jobs/Makefile src/hydra-eval-jobs/Makefile
src/hydra-queue-runner/Makefile src/hydra-queue-runner/Makefile
src/sql/Makefile src/sql/Makefile

View file

@ -58,8 +58,8 @@ rec {
src = fetchFromGitHub { src = fetchFromGitHub {
owner = "NixOS"; owner = "NixOS";
repo = "nix"; repo = "nix";
rev = "edf9eb8181e01f6b2123e5690019cfeeb44fc1c2"; rev = "5e61b422c58baac26b232233d39f5814cc35d52a";
sha256 = "1a00q9pypfziyi9hxl4rsammhwj7991wm4b1z9zcgl7zqksr3582"; sha256 = "0awic5zwibgpj5shpgjf2364imp2f84c8xi5r0x4p351q4kpg9z4";
}; };
buildInputs = attrs.buildInputs ++ [ autoreconfHook bison flex ]; buildInputs = attrs.buildInputs ++ [ autoreconfHook bison flex ];
nativeBuildInputs = attrs.nativeBuildInputs ++ [ aws-sdk-cpp' autoconf-archive ]; nativeBuildInputs = attrs.nativeBuildInputs ++ [ aws-sdk-cpp' autoconf-archive ];
@ -124,6 +124,8 @@ rec {
src = if shell then null else hydraSrc; src = if shell then null else hydraSrc;
stdenv = overrideCC stdenv gcc6;
buildInputs = buildInputs =
[ makeWrapper autoconf automake libtool unzip nukeReferences pkgconfig sqlite libpqxx [ makeWrapper autoconf automake libtool unzip nukeReferences pkgconfig sqlite libpqxx
gitAndTools.topGit mercurial darcs subversion bazaar openssl bzip2 libxslt gitAndTools.topGit mercurial darcs subversion bazaar openssl bzip2 libxslt

View file

@ -1,3 +1,3 @@
SUBDIRS = hydra-eval-jobs hydra-queue-runner sql script lib root xsl ttf SUBDIRS = hydra-evaluator hydra-eval-jobs hydra-queue-runner sql script lib root xsl ttf
BOOTCLEAN_SUBDIRS = $(SUBDIRS) BOOTCLEAN_SUBDIRS = $(SUBDIRS)
DIST_SUBDIRS = $(SUBDIRS) DIST_SUBDIRS = $(SUBDIRS)

View file

@ -0,0 +1,5 @@
bin_PROGRAMS = hydra-evaluator
hydra_evaluator_SOURCES = hydra-evaluator.cc
hydra_evaluator_LDADD = $(NIX_LIBS) -lpqxx
hydra_evaluator_CXXFLAGS = $(NIX_CFLAGS) -Wall -I ../libhydra

View file

@ -0,0 +1,275 @@
#include "shared.hh"
#include "db.hh"
#include "pool.hh"
#include <algorithm>
#include <thread>
#include <cstring>
#include <sys/types.h>
#include <sys/wait.h>
using namespace nix;
struct Evaluator
{
nix::Pool<Connection> dbPool;
typedef std::pair<std::string, std::string> JobsetName;
struct Jobset
{
JobsetName name;
time_t lastCheckedTime, triggerTime;
int checkInterval;
Pid pid;
};
typedef std::map<JobsetName, Jobset> Jobsets;
int evalTimeout = 3600;
size_t maxEvals = 4;
struct State
{
size_t runningEvals = 0;
Jobsets jobsets;
};
Sync<State> state_;
std::condition_variable childStarted;
std::condition_variable maybeDoWork;
const time_t notTriggered = std::numeric_limits<time_t>::max();
void readJobsets()
{
auto conn(dbPool.get());
pqxx::work txn(*conn);
auto res = txn.parameterized
("select project, j.name, lastCheckedTime, triggerTime, checkInterval from Jobsets j join Projects p on j.project = p.name "
"where j.enabled != 0 and p.enabled != 0").exec();
auto state(state_.lock());
std::set<JobsetName> seen;
for (auto const & row : res) {
auto name = JobsetName{row["project"].as<std::string>(), row["name"].as<std::string>()};
auto res = state->jobsets.try_emplace(name, Jobset{name});
auto & jobset = res.first->second;
jobset.lastCheckedTime = row["lastCheckedTime"].as<time_t>(0);
jobset.triggerTime = row["triggerTime"].as<time_t>(notTriggered);
jobset.checkInterval = row["checkInterval"].as<time_t>();
seen.insert(name);
}
for (auto i = state->jobsets.begin(); i != state->jobsets.end(); )
if (seen.count(i->first))
++i;
else {
printInfo("forgetting jobset %s:%s", i->first.first, i->first.second);
i = state->jobsets.erase(i);
}
}
void startEval(State & state, Jobset & jobset)
{
printInfo("starting evaluation of jobset %s:%s", jobset.name.first, jobset.name.second);
assert(jobset.pid == -1);
jobset.pid = startProcess([&]() {
Strings args = { "timeout", "-s", "KILL", std::to_string(evalTimeout), "hydra-eval-jobset", jobset.name.first, jobset.name.second };
execvp(args.front().c_str(), stringsToCharPtrs(args).data());
throw SysError(format("executing %1%") % args.front());
});
state.runningEvals++;
childStarted.notify_one();
time_t now = time(0);
{
auto conn(dbPool.get());
pqxx::work txn(*conn);
txn.parameterized
("update Jobsets set lastCheckedTime = $1, triggerTime = null where project = $2 and name = $3")
(now)
(jobset.name.first)
(jobset.name.second)
.exec();
txn.commit();
jobset.lastCheckedTime = now;
jobset.triggerTime = notTriggered;
}
}
void startEvals(State & state)
{
std::vector<Jobsets::iterator> sorted;
time_t now = time(0);
/* Filter out jobsets that have been evaluated recently and have
not been triggered. */
for (auto i = state.jobsets.begin(); i != state.jobsets.end(); ++i)
if (i->second.pid == -1 &&
(i->second.triggerTime != std::numeric_limits<time_t>::max() ||
(i->second.checkInterval > 0 && i->second.lastCheckedTime + i->second.checkInterval <= now)))
sorted.push_back(i);
/* Put jobsets in order of ascending trigger time, last checked
time, and name. */
std::sort(sorted.begin(), sorted.end(),
[](const Jobsets::iterator & a, const Jobsets::iterator & b) {
return
a->second.triggerTime != b->second.triggerTime
? a->second.triggerTime < b->second.triggerTime
: a->second.lastCheckedTime != b->second.lastCheckedTime
? a->second.lastCheckedTime < b->second.lastCheckedTime
: a->first < b->first;
});
/* Start jobset evaluations up to the concurrency limit.*/
for (auto & i : sorted) {
if (state.runningEvals >= maxEvals) break;
startEval(state, i->second);
}
}
void loop()
{
auto state(state_.lock());
while (true) {
time_t now = time(0);
std::chrono::seconds sleepTime = std::chrono::seconds::max();
if (state->runningEvals < maxEvals) {
for (auto & i : state->jobsets)
if (i.second.pid == -1 &&
i.second.checkInterval > 0)
sleepTime = std::min(sleepTime, std::chrono::seconds(
std::max((time_t) 1, i.second.lastCheckedTime - now + i.second.checkInterval)));
}
debug("waiting for %d s", sleepTime.count());
if (sleepTime == std::chrono::seconds::max())
state.wait(maybeDoWork);
else
state.wait_for(maybeDoWork, sleepTime);
startEvals(*state);
}
}
/* A thread that listens to PostgreSQL notifications about jobset
changes, updates the jobsets map, and signals the main thread
to start evaluations. */
void databaseMonitor()
{
while (true) {
try {
auto conn(dbPool.get());
receiver jobsetsAdded(*conn, "jobsets_added");
receiver jobsetsDeleted(*conn, "jobsets_deleted");
receiver jobsetsChanged(*conn, "jobset_scheduling_changed");
while (true) {
/* Note: we read/notify before
await_notification() to ensure we don't miss a
state change. */
readJobsets();
maybeDoWork.notify_one();
conn->await_notification();
printInfo("received jobset event");
}
} catch (std::exception & e) {
printError("exception in database monitor thread: %s", e.what());
sleep(30);
}
}
}
/* A thread that reaps child processes.*/
void reaper()
{
while (true) {
{
auto state(state_.lock());
while (!state->runningEvals)
state.wait(childStarted);
}
int status;
pid_t pid = waitpid(-1, &status, 0);
if (pid == -1) {
if (errno == EINTR) continue;
throw SysError("waiting for children");
}
{
auto state(state_.lock());
assert(state->runningEvals);
state->runningEvals--;
for (auto & jobset : state->jobsets)
if (jobset.second.pid == pid) {
printInfo("evaluation of jobset %s:%s finished with status %d",
jobset.first.first, jobset.first.second, status);
jobset.second.pid.release();
maybeDoWork.notify_one();
break;
}
}
}
}
void run()
{
std::thread reaperThread([&]() { reaper(); });
std::thread monitorThread([&]() { databaseMonitor(); });
while (true) {
try {
loop();
} catch (std::exception & e) {
printError("exception in main loop: %s", e.what());
sleep(30);
}
}
}
};
int main(int argc, char * * argv)
{
return handleExceptions(argv[0], [&]() {
initNix();
signal(SIGINT, SIG_DFL);
signal(SIGTERM, SIG_DFL);
signal(SIGHUP, SIG_DFL);
parseCmdLine(argc, argv, [&](Strings::iterator & arg, const Strings::iterator & end) {
return false;
});
Evaluator().run();
});
}

View file

@ -4,5 +4,4 @@ hydra_queue_runner_SOURCES = hydra-queue-runner.cc queue-monitor.cc dispatcher.c
builder.cc build-result.cc build-remote.cc \ builder.cc build-result.cc build-remote.cc \
build-result.hh counter.hh token-server.hh state.hh db.hh build-result.hh counter.hh token-server.hh state.hh db.hh
hydra_queue_runner_LDADD = $(NIX_LIBS) -lpqxx hydra_queue_runner_LDADD = $(NIX_LIBS) -lpqxx
hydra_queue_runner_CXXFLAGS = $(NIX_CFLAGS) -Wall -I ../libhydra
AM_CXXFLAGS = $(NIX_CFLAGS) -Wall

View file

@ -4,7 +4,7 @@ EXTRA_DIST = \
distributable_scripts = \ distributable_scripts = \
hydra-init \ hydra-init \
hydra-evaluator \ hydra-eval-jobset \
hydra-server \ hydra-server \
hydra-update-gc-roots \ hydra-update-gc-roots \
hydra-s3-backup-collect-garbage \ hydra-s3-backup-collect-garbage \

View file

@ -345,47 +345,10 @@ sub checkJobset {
} }
sub checkSomeJobset { die "syntax: $0 <PROJECT> <JOBSET>\n" unless @ARGV == 2;
# If any jobset has been triggered by a push, check it.
my ($jobset) = $db->resultset('Jobsets')->search(
{ 'triggertime' => { '!=', undef } },
{ join => 'project', order_by => [ 'triggertime' ], rows => 1 });
# Otherwise, check the jobset that hasn't been checked for the my $projectName = $ARGV[0];
# longest time (but don't check more often than the jobset's my $jobsetName = $ARGV[1];
# minimal check interval). my $jobset = $db->resultset('Jobsets')->find($projectName, $jobsetName) or
($jobset) = $db->resultset('Jobsets')->search( die "$0: specified jobset \"$projectName:$jobsetName\" does not exist\n";
{ 'project.enabled' => 1, 'me.enabled' => { '!=' => 0 }, exit checkJobset($jobset);
, 'checkinterval' => { '!=', 0 }
, -or => [ 'lastcheckedtime' => undef, 'lastcheckedtime' => { '<', \ (time() . " - me.checkinterval") } ] },
{ join => 'project', order_by => [ 'lastcheckedtime nulls first' ], rows => 1 })
unless defined $jobset;
return 0 unless defined $jobset;
return system($0, $jobset->project->name, $jobset->name) == 0;
}
if (scalar @ARGV == 2) {
my $projectName = $ARGV[0];
my $jobsetName = $ARGV[1];
my $jobset = $db->resultset('Jobsets')->find($projectName, $jobsetName) or
die "$0: specified jobset \"$projectName:$jobsetName\" does not exist\n";
exit checkJobset($jobset);
}
while (1) {
eval {
if (checkSomeJobset) {
# Just so we don't go completely crazy if lastcheckedtime
# isn't updated properly.
sleep 1;
} else {
# print STDERR "sleeping...\n";
sleep 30;
}
};
if ($@) { print STDERR "$@"; }
}

View file

@ -83,6 +83,19 @@ create function notifyJobsetSharesChanged() returns trigger as 'begin notify job
create trigger JobsetSharesChanged after update on Jobsets for each row create trigger JobsetSharesChanged after update on Jobsets for each row
when (old.schedulingShares != new.schedulingShares) execute procedure notifyJobsetSharesChanged(); when (old.schedulingShares != new.schedulingShares) execute procedure notifyJobsetSharesChanged();
create function notifyJobsetsAdded() returns trigger as 'begin notify jobsets_added; return null; end;' language plpgsql;
create trigger JobsetsAdded after insert on Jobsets execute procedure notifyJobsetsAdded();
create function notifyJobsetsDeleted() returns trigger as 'begin notify jobsets_deleted; return null; end;' language plpgsql;
create trigger JobsetsDeleted after delete on Jobsets execute procedure notifyJobsetsDeleted();
create function notifyJobsetSchedulingChanged() returns trigger as 'begin notify jobset_scheduling_changed; return null; end;' language plpgsql;
create trigger JobsetSchedulingChanged after update on Jobsets for each row
when (((old.triggerTime is distinct from new.triggerTime) and (new.triggerTime is not null))
or old.checkInterval != new.checkInterval
or old.enabled != new.enabled)
execute procedure notifyJobsetSchedulingChanged();
#endif #endif

12
src/sql/upgrade-50.sql Normal file
View file

@ -0,0 +1,12 @@
create function notifyJobsetsAdded() returns trigger as 'begin notify jobsets_added; return null; end;' language plpgsql;
create trigger JobsetsAdded after insert on Jobsets execute procedure notifyJobsetsAdded();
create function notifyJobsetsDeleted() returns trigger as 'begin notify jobsets_deleted; return null; end;' language plpgsql;
create trigger JobsetsDeleted after delete on Jobsets execute procedure notifyJobsetsDeleted();
create function notifyJobsetSchedulingChanged() returns trigger as 'begin notify jobset_scheduling_changed; return null; end;' language plpgsql;
create trigger JobsetSchedulingChanged after update on Jobsets for each row
when ((old.triggerTime is distinct from new.triggerTime) and (new.triggerTime is not null))
or old.checkInterval != new.checkInterval
or old.enabled != new.enabled
execute procedure notifyJobsetSchedulingChanged();

View file

@ -13,7 +13,7 @@ TESTS_ENVIRONMENT = \
NIX_LOG_DIR="$(abs_builddir)/nix/var/log/nix" \ NIX_LOG_DIR="$(abs_builddir)/nix/var/log/nix" \
NIX_BUILD_HOOK= \ NIX_BUILD_HOOK= \
PERL5LIB="$(srcdir):$(top_srcdir)/src/lib:$$PERL5LIB" \ PERL5LIB="$(srcdir):$(top_srcdir)/src/lib:$$PERL5LIB" \
PATH=$(abs_top_srcdir)/src/script:$(abs_top_srcdir)/src/hydra-eval-jobs:$(abs_top_srcdir)/src/hydra-queue-runner:$$PATH \ PATH=$(abs_top_srcdir)/src/hydra-evaluator:$(abs_top_srcdir)/src/script:$(abs_top_srcdir)/src/hydra-eval-jobs:$(abs_top_srcdir)/src/hydra-queue-runner:$$PATH \
perl -w perl -w
EXTRA_DIST = \ EXTRA_DIST = \

View file

@ -61,7 +61,7 @@ sub createJobsetWithOneInput {
sub evalSucceeds { sub evalSucceeds {
my ($jobset) = @_; my ($jobset) = @_;
my ($res, $stdout, $stderr) = captureStdoutStderr(60, ("hydra-evaluator", $jobset->project->name, $jobset->name)); my ($res, $stdout, $stderr) = captureStdoutStderr(60, ("hydra-eval-jobset", $jobset->project->name, $jobset->name));
chomp $stdout; chomp $stderr; chomp $stdout; chomp $stderr;
print STDERR "Evaluation errors for jobset ".$jobset->project->name.":".$jobset->name.": \n".$jobset->errormsg."\n" if $jobset->errormsg; print STDERR "Evaluation errors for jobset ".$jobset->project->name.":".$jobset->name.": \n".$jobset->errormsg."\n" if $jobset->errormsg;
print STDERR "STDOUT: $stdout\n" if $stdout ne ""; print STDERR "STDOUT: $stdout\n" if $stdout ne "";

View file

@ -49,14 +49,14 @@ ok(exists $jobset->{jobsetinputs}->{"my-src"}, "The new jobset has a 'my-src' in
ok($jobset->{jobsetinputs}->{"my-src"}->{jobsetinputalts}->[0] eq "/run/jobset", "The 'my-src' input is in /run/jobset"); ok($jobset->{jobsetinputs}->{"my-src"}->{jobsetinputalts}->[0] eq "/run/jobset", "The 'my-src' input is in /run/jobset");
system("hydra-evaluator sample default"); system("hydra-eval-jobset sample default");
$result = request_json({ uri => '/jobset/sample/default/evals' }); $result = request_json({ uri => '/jobset/sample/default/evals' });
ok($result->code() == 200, "Can get evals of a jobset"); ok($result->code() == 200, "Can get evals of a jobset");
my $evals = decode_json($result->content())->{evals}; my $evals = decode_json($result->content())->{evals};
my $eval = $evals->[0]; my $eval = $evals->[0];
ok($eval->{hasnewbuilds} == 1, "The first eval of a jobset has new builds"); ok($eval->{hasnewbuilds} == 1, "The first eval of a jobset has new builds");
system("echo >> /run/jobset/default.nix; hydra-evaluator sample default"); system("echo >> /run/jobset/default.nix; hydra-eval-jobset sample default");
my $evals = decode_json(request_json({ uri => '/jobset/sample/default/evals' })->content())->{evals}; my $evals = decode_json(request_json({ uri => '/jobset/sample/default/evals' })->content())->{evals};
ok($evals->[0]->{jobsetevalinputs}->{"my-src"}->{revision} != $evals->[1]->{jobsetevalinputs}->{"my-src"}->{revision}, "Changing a jobset source changes its revision"); ok($evals->[0]->{jobsetevalinputs}->{"my-src"}->{revision} != $evals->[1]->{jobsetevalinputs}->{"my-src"}->{revision}, "Changing a jobset source changes its revision");

View file

@ -19,7 +19,7 @@ my $jobsetinput;
$jobsetinput = $jobset->jobsetinputs->create({name => "jobs", type => "path"}); $jobsetinput = $jobset->jobsetinputs->create({name => "jobs", type => "path"});
$jobsetinput->jobsetinputalts->create({altnr => 0, value => getcwd . "/jobs"}); $jobsetinput->jobsetinputalts->create({altnr => 0, value => getcwd . "/jobs"});
system("hydra-evaluator " . $jobset->project->name . " " . $jobset->name); system("hydra-eval-jobset " . $jobset->project->name . " " . $jobset->name);
my $successful_hash; my $successful_hash;
foreach my $build ($jobset->builds->search({finished => 0})) { foreach my $build ($jobset->builds->search({finished => 0})) {