From e0b2921ff2f714fd243a4ef3357c5d01fffba1a5 Mon Sep 17 00:00:00 2001 From: Eelco Dolstra Date: Thu, 13 Oct 2016 15:53:05 +0200 Subject: [PATCH] Concurrent hydra-evaluator This rewrites the top-level loop of hydra-evaluator in C++. The Perl stuff is moved into hydra-eval-jobset. (Rewriting the entire evaluator would be nice but is a bit too much work.) The new version has some advantages: * It can run multiple jobset evaluations in parallel. * It uses PostgreSQL notifications so it doesn't have to poll the database. So if a jobset is triggered via the web interface or from a GitHub / Bitbucket webhook, evaluation of the jobset will start almost instantaneously (assuming the evaluator is not at its concurrency limit). * It imposes a timeout on evaluations. So if e.g. hydra-eval-jobset hangs connecting to a Mercurial server, it will eventually be killed. --- configure.ac | 3 +- release.nix | 6 +- src/Makefile.am | 2 +- src/hydra-evaluator/Makefile.am | 5 + src/hydra-evaluator/hydra-evaluator.cc | 275 ++++++++++++++++++ src/hydra-queue-runner/Makefile.am | 3 +- src/{hydra-queue-runner => libhydra}/db.hh | 0 src/script/Makefile.am | 2 +- .../{hydra-evaluator => hydra-eval-jobset} | 49 +--- src/sql/hydra.sql | 13 + src/sql/upgrade-50.sql | 12 + tests/Makefile.am | 2 +- tests/Setup.pm | 2 +- tests/api-test.pl | 4 +- tests/s3-backup-test.pl | 2 +- 15 files changed, 325 insertions(+), 55 deletions(-) create mode 100644 src/hydra-evaluator/Makefile.am create mode 100644 src/hydra-evaluator/hydra-evaluator.cc rename src/{hydra-queue-runner => libhydra}/db.hh (100%) rename src/script/{hydra-evaluator => hydra-eval-jobset} (89%) create mode 100644 src/sql/upgrade-50.sql diff --git a/configure.ac b/configure.ac index bf03e515..d2bb416a 100644 --- a/configure.ac +++ b/configure.ac @@ -11,7 +11,7 @@ AC_PROG_LN_S AC_PROG_LIBTOOL AC_PROG_CXX -CXXFLAGS+=" -std=c++11" +CXXFLAGS+=" -std=c++17" AC_PATH_PROG([XSLTPROC], [xsltproc]) @@ -70,6 +70,7 @@ AC_CONFIG_FILES([ doc/Makefile doc/manual/Makefile src/Makefile + src/hydra-evaluator/Makefile src/hydra-eval-jobs/Makefile src/hydra-queue-runner/Makefile src/sql/Makefile diff --git a/release.nix b/release.nix index 78004213..48b224ab 100644 --- a/release.nix +++ b/release.nix @@ -58,8 +58,8 @@ rec { src = fetchFromGitHub { owner = "NixOS"; repo = "nix"; - rev = "edf9eb8181e01f6b2123e5690019cfeeb44fc1c2"; - sha256 = "1a00q9pypfziyi9hxl4rsammhwj7991wm4b1z9zcgl7zqksr3582"; + rev = "5e61b422c58baac26b232233d39f5814cc35d52a"; + sha256 = "0awic5zwibgpj5shpgjf2364imp2f84c8xi5r0x4p351q4kpg9z4"; }; buildInputs = attrs.buildInputs ++ [ autoreconfHook bison flex ]; nativeBuildInputs = attrs.nativeBuildInputs ++ [ aws-sdk-cpp' autoconf-archive ]; @@ -124,6 +124,8 @@ rec { src = if shell then null else hydraSrc; + stdenv = overrideCC stdenv gcc6; + buildInputs = [ makeWrapper autoconf automake libtool unzip nukeReferences pkgconfig sqlite libpqxx gitAndTools.topGit mercurial darcs subversion bazaar openssl bzip2 libxslt diff --git a/src/Makefile.am b/src/Makefile.am index a1936113..c760eda1 100644 --- a/src/Makefile.am +++ b/src/Makefile.am @@ -1,3 +1,3 @@ -SUBDIRS = hydra-eval-jobs hydra-queue-runner sql script lib root xsl ttf +SUBDIRS = hydra-evaluator hydra-eval-jobs hydra-queue-runner sql script lib root xsl ttf BOOTCLEAN_SUBDIRS = $(SUBDIRS) DIST_SUBDIRS = $(SUBDIRS) diff --git a/src/hydra-evaluator/Makefile.am b/src/hydra-evaluator/Makefile.am new file mode 100644 index 00000000..161f8567 --- /dev/null +++ b/src/hydra-evaluator/Makefile.am @@ -0,0 +1,5 @@ +bin_PROGRAMS = hydra-evaluator + +hydra_evaluator_SOURCES = hydra-evaluator.cc +hydra_evaluator_LDADD = $(NIX_LIBS) -lpqxx +hydra_evaluator_CXXFLAGS = $(NIX_CFLAGS) -Wall -I ../libhydra diff --git a/src/hydra-evaluator/hydra-evaluator.cc b/src/hydra-evaluator/hydra-evaluator.cc new file mode 100644 index 00000000..cd19441e --- /dev/null +++ b/src/hydra-evaluator/hydra-evaluator.cc @@ -0,0 +1,275 @@ +#include "shared.hh" +#include "db.hh" +#include "pool.hh" + +#include +#include +#include + +#include +#include + +using namespace nix; + +struct Evaluator +{ + nix::Pool dbPool; + + typedef std::pair JobsetName; + + struct Jobset + { + JobsetName name; + time_t lastCheckedTime, triggerTime; + int checkInterval; + Pid pid; + }; + + typedef std::map Jobsets; + + int evalTimeout = 3600; + + size_t maxEvals = 4; + + struct State + { + size_t runningEvals = 0; + Jobsets jobsets; + }; + + Sync state_; + + std::condition_variable childStarted; + std::condition_variable maybeDoWork; + + const time_t notTriggered = std::numeric_limits::max(); + + void readJobsets() + { + auto conn(dbPool.get()); + + pqxx::work txn(*conn); + + auto res = txn.parameterized + ("select project, j.name, lastCheckedTime, triggerTime, checkInterval from Jobsets j join Projects p on j.project = p.name " + "where j.enabled != 0 and p.enabled != 0").exec(); + + auto state(state_.lock()); + + std::set seen; + + for (auto const & row : res) { + auto name = JobsetName{row["project"].as(), row["name"].as()}; + + auto res = state->jobsets.try_emplace(name, Jobset{name}); + + auto & jobset = res.first->second; + jobset.lastCheckedTime = row["lastCheckedTime"].as(0); + jobset.triggerTime = row["triggerTime"].as(notTriggered); + jobset.checkInterval = row["checkInterval"].as(); + + seen.insert(name); + } + + for (auto i = state->jobsets.begin(); i != state->jobsets.end(); ) + if (seen.count(i->first)) + ++i; + else { + printInfo("forgetting jobset ‘%s:%s’", i->first.first, i->first.second); + i = state->jobsets.erase(i); + } + } + + void startEval(State & state, Jobset & jobset) + { + printInfo("starting evaluation of jobset ‘%s:%s’", jobset.name.first, jobset.name.second); + + assert(jobset.pid == -1); + + jobset.pid = startProcess([&]() { + Strings args = { "timeout", "-s", "KILL", std::to_string(evalTimeout), "hydra-eval-jobset", jobset.name.first, jobset.name.second }; + execvp(args.front().c_str(), stringsToCharPtrs(args).data()); + throw SysError(format("executing ‘%1%’") % args.front()); + }); + + state.runningEvals++; + + childStarted.notify_one(); + + time_t now = time(0); + + { + auto conn(dbPool.get()); + pqxx::work txn(*conn); + txn.parameterized + ("update Jobsets set lastCheckedTime = $1, triggerTime = null where project = $2 and name = $3") + (now) + (jobset.name.first) + (jobset.name.second) + .exec(); + txn.commit(); + + jobset.lastCheckedTime = now; + jobset.triggerTime = notTriggered; + } + } + + void startEvals(State & state) + { + std::vector sorted; + + time_t now = time(0); + + /* Filter out jobsets that have been evaluated recently and have + not been triggered. */ + for (auto i = state.jobsets.begin(); i != state.jobsets.end(); ++i) + if (i->second.pid == -1 && + (i->second.triggerTime != std::numeric_limits::max() || + (i->second.checkInterval > 0 && i->second.lastCheckedTime + i->second.checkInterval <= now))) + sorted.push_back(i); + + /* Put jobsets in order of ascending trigger time, last checked + time, and name. */ + std::sort(sorted.begin(), sorted.end(), + [](const Jobsets::iterator & a, const Jobsets::iterator & b) { + return + a->second.triggerTime != b->second.triggerTime + ? a->second.triggerTime < b->second.triggerTime + : a->second.lastCheckedTime != b->second.lastCheckedTime + ? a->second.lastCheckedTime < b->second.lastCheckedTime + : a->first < b->first; + }); + + /* Start jobset evaluations up to the concurrency limit.*/ + for (auto & i : sorted) { + if (state.runningEvals >= maxEvals) break; + startEval(state, i->second); + } + } + + void loop() + { + auto state(state_.lock()); + + while (true) { + + time_t now = time(0); + + std::chrono::seconds sleepTime = std::chrono::seconds::max(); + + if (state->runningEvals < maxEvals) { + for (auto & i : state->jobsets) + if (i.second.pid == -1 && + i.second.checkInterval > 0) + sleepTime = std::min(sleepTime, std::chrono::seconds( + std::max((time_t) 1, i.second.lastCheckedTime - now + i.second.checkInterval))); + } + + debug("waiting for %d s", sleepTime.count()); + if (sleepTime == std::chrono::seconds::max()) + state.wait(maybeDoWork); + else + state.wait_for(maybeDoWork, sleepTime); + + startEvals(*state); + } + } + + /* A thread that listens to PostgreSQL notifications about jobset + changes, updates the jobsets map, and signals the main thread + to start evaluations. */ + void databaseMonitor() + { + while (true) { + + try { + + auto conn(dbPool.get()); + + receiver jobsetsAdded(*conn, "jobsets_added"); + receiver jobsetsDeleted(*conn, "jobsets_deleted"); + receiver jobsetsChanged(*conn, "jobset_scheduling_changed"); + + while (true) { + /* Note: we read/notify before + await_notification() to ensure we don't miss a + state change. */ + readJobsets(); + maybeDoWork.notify_one(); + conn->await_notification(); + printInfo("received jobset event"); + } + + } catch (std::exception & e) { + printError("exception in database monitor thread: %s", e.what()); + sleep(30); + } + } + } + + /* A thread that reaps child processes.*/ + void reaper() + { + while (true) { + { + auto state(state_.lock()); + while (!state->runningEvals) + state.wait(childStarted); + } + + int status; + pid_t pid = waitpid(-1, &status, 0); + if (pid == -1) { + if (errno == EINTR) continue; + throw SysError("waiting for children"); + } + + { + auto state(state_.lock()); + assert(state->runningEvals); + state->runningEvals--; + for (auto & jobset : state->jobsets) + if (jobset.second.pid == pid) { + printInfo("evaluation of jobset ‘%s:%s’ finished with status %d", + jobset.first.first, jobset.first.second, status); + jobset.second.pid.release(); + maybeDoWork.notify_one(); + break; + } + } + } + } + + void run() + { + std::thread reaperThread([&]() { reaper(); }); + + std::thread monitorThread([&]() { databaseMonitor(); }); + + while (true) { + try { + loop(); + } catch (std::exception & e) { + printError("exception in main loop: %s", e.what()); + sleep(30); + } + } + } +}; + +int main(int argc, char * * argv) +{ + return handleExceptions(argv[0], [&]() { + initNix(); + + signal(SIGINT, SIG_DFL); + signal(SIGTERM, SIG_DFL); + signal(SIGHUP, SIG_DFL); + + parseCmdLine(argc, argv, [&](Strings::iterator & arg, const Strings::iterator & end) { + return false; + }); + + Evaluator().run(); + }); +} diff --git a/src/hydra-queue-runner/Makefile.am b/src/hydra-queue-runner/Makefile.am index e1d03867..b360faed 100644 --- a/src/hydra-queue-runner/Makefile.am +++ b/src/hydra-queue-runner/Makefile.am @@ -4,5 +4,4 @@ hydra_queue_runner_SOURCES = hydra-queue-runner.cc queue-monitor.cc dispatcher.c builder.cc build-result.cc build-remote.cc \ build-result.hh counter.hh token-server.hh state.hh db.hh hydra_queue_runner_LDADD = $(NIX_LIBS) -lpqxx - -AM_CXXFLAGS = $(NIX_CFLAGS) -Wall +hydra_queue_runner_CXXFLAGS = $(NIX_CFLAGS) -Wall -I ../libhydra diff --git a/src/hydra-queue-runner/db.hh b/src/libhydra/db.hh similarity index 100% rename from src/hydra-queue-runner/db.hh rename to src/libhydra/db.hh diff --git a/src/script/Makefile.am b/src/script/Makefile.am index 9b389bd4..5852bc85 100644 --- a/src/script/Makefile.am +++ b/src/script/Makefile.am @@ -4,7 +4,7 @@ EXTRA_DIST = \ distributable_scripts = \ hydra-init \ - hydra-evaluator \ + hydra-eval-jobset \ hydra-server \ hydra-update-gc-roots \ hydra-s3-backup-collect-garbage \ diff --git a/src/script/hydra-evaluator b/src/script/hydra-eval-jobset similarity index 89% rename from src/script/hydra-evaluator rename to src/script/hydra-eval-jobset index 7568ac32..ba478f6a 100755 --- a/src/script/hydra-evaluator +++ b/src/script/hydra-eval-jobset @@ -345,47 +345,10 @@ sub checkJobset { } -sub checkSomeJobset { - # If any jobset has been triggered by a push, check it. - my ($jobset) = $db->resultset('Jobsets')->search( - { 'triggertime' => { '!=', undef } }, - { join => 'project', order_by => [ 'triggertime' ], rows => 1 }); +die "syntax: $0 \n" unless @ARGV == 2; - # Otherwise, check the jobset that hasn't been checked for the - # longest time (but don't check more often than the jobset's - # minimal check interval). - ($jobset) = $db->resultset('Jobsets')->search( - { 'project.enabled' => 1, 'me.enabled' => { '!=' => 0 }, - , 'checkinterval' => { '!=', 0 } - , -or => [ 'lastcheckedtime' => undef, 'lastcheckedtime' => { '<', \ (time() . " - me.checkinterval") } ] }, - { join => 'project', order_by => [ 'lastcheckedtime nulls first' ], rows => 1 }) - unless defined $jobset; - - return 0 unless defined $jobset; - - return system($0, $jobset->project->name, $jobset->name) == 0; -} - - -if (scalar @ARGV == 2) { - my $projectName = $ARGV[0]; - my $jobsetName = $ARGV[1]; - my $jobset = $db->resultset('Jobsets')->find($projectName, $jobsetName) or - die "$0: specified jobset \"$projectName:$jobsetName\" does not exist\n"; - exit checkJobset($jobset); -} - - -while (1) { - eval { - if (checkSomeJobset) { - # Just so we don't go completely crazy if lastcheckedtime - # isn't updated properly. - sleep 1; - } else { - # print STDERR "sleeping...\n"; - sleep 30; - } - }; - if ($@) { print STDERR "$@"; } -} +my $projectName = $ARGV[0]; +my $jobsetName = $ARGV[1]; +my $jobset = $db->resultset('Jobsets')->find($projectName, $jobsetName) or + die "$0: specified jobset \"$projectName:$jobsetName\" does not exist\n"; +exit checkJobset($jobset); diff --git a/src/sql/hydra.sql b/src/sql/hydra.sql index db5bb80b..2d996760 100644 --- a/src/sql/hydra.sql +++ b/src/sql/hydra.sql @@ -83,6 +83,19 @@ create function notifyJobsetSharesChanged() returns trigger as 'begin notify job create trigger JobsetSharesChanged after update on Jobsets for each row when (old.schedulingShares != new.schedulingShares) execute procedure notifyJobsetSharesChanged(); +create function notifyJobsetsAdded() returns trigger as 'begin notify jobsets_added; return null; end;' language plpgsql; +create trigger JobsetsAdded after insert on Jobsets execute procedure notifyJobsetsAdded(); + +create function notifyJobsetsDeleted() returns trigger as 'begin notify jobsets_deleted; return null; end;' language plpgsql; +create trigger JobsetsDeleted after delete on Jobsets execute procedure notifyJobsetsDeleted(); + +create function notifyJobsetSchedulingChanged() returns trigger as 'begin notify jobset_scheduling_changed; return null; end;' language plpgsql; +create trigger JobsetSchedulingChanged after update on Jobsets for each row + when (((old.triggerTime is distinct from new.triggerTime) and (new.triggerTime is not null)) + or old.checkInterval != new.checkInterval + or old.enabled != new.enabled) + execute procedure notifyJobsetSchedulingChanged(); + #endif diff --git a/src/sql/upgrade-50.sql b/src/sql/upgrade-50.sql new file mode 100644 index 00000000..b19dfa5f --- /dev/null +++ b/src/sql/upgrade-50.sql @@ -0,0 +1,12 @@ +create function notifyJobsetsAdded() returns trigger as 'begin notify jobsets_added; return null; end;' language plpgsql; +create trigger JobsetsAdded after insert on Jobsets execute procedure notifyJobsetsAdded(); + +create function notifyJobsetsDeleted() returns trigger as 'begin notify jobsets_deleted; return null; end;' language plpgsql; +create trigger JobsetsDeleted after delete on Jobsets execute procedure notifyJobsetsDeleted(); + +create function notifyJobsetSchedulingChanged() returns trigger as 'begin notify jobset_scheduling_changed; return null; end;' language plpgsql; +create trigger JobsetSchedulingChanged after update on Jobsets for each row + when ((old.triggerTime is distinct from new.triggerTime) and (new.triggerTime is not null)) + or old.checkInterval != new.checkInterval + or old.enabled != new.enabled + execute procedure notifyJobsetSchedulingChanged(); diff --git a/tests/Makefile.am b/tests/Makefile.am index 57a19468..fd7496f3 100644 --- a/tests/Makefile.am +++ b/tests/Makefile.am @@ -13,7 +13,7 @@ TESTS_ENVIRONMENT = \ NIX_LOG_DIR="$(abs_builddir)/nix/var/log/nix" \ NIX_BUILD_HOOK= \ PERL5LIB="$(srcdir):$(top_srcdir)/src/lib:$$PERL5LIB" \ - PATH=$(abs_top_srcdir)/src/script:$(abs_top_srcdir)/src/hydra-eval-jobs:$(abs_top_srcdir)/src/hydra-queue-runner:$$PATH \ + PATH=$(abs_top_srcdir)/src/hydra-evaluator:$(abs_top_srcdir)/src/script:$(abs_top_srcdir)/src/hydra-eval-jobs:$(abs_top_srcdir)/src/hydra-queue-runner:$$PATH \ perl -w EXTRA_DIST = \ diff --git a/tests/Setup.pm b/tests/Setup.pm index 9985cca2..516c9e7f 100644 --- a/tests/Setup.pm +++ b/tests/Setup.pm @@ -61,7 +61,7 @@ sub createJobsetWithOneInput { sub evalSucceeds { my ($jobset) = @_; - my ($res, $stdout, $stderr) = captureStdoutStderr(60, ("hydra-evaluator", $jobset->project->name, $jobset->name)); + my ($res, $stdout, $stderr) = captureStdoutStderr(60, ("hydra-eval-jobset", $jobset->project->name, $jobset->name)); chomp $stdout; chomp $stderr; print STDERR "Evaluation errors for jobset ".$jobset->project->name.":".$jobset->name.": \n".$jobset->errormsg."\n" if $jobset->errormsg; print STDERR "STDOUT: $stdout\n" if $stdout ne ""; diff --git a/tests/api-test.pl b/tests/api-test.pl index 99f05780..5cc433be 100644 --- a/tests/api-test.pl +++ b/tests/api-test.pl @@ -49,14 +49,14 @@ ok(exists $jobset->{jobsetinputs}->{"my-src"}, "The new jobset has a 'my-src' in ok($jobset->{jobsetinputs}->{"my-src"}->{jobsetinputalts}->[0] eq "/run/jobset", "The 'my-src' input is in /run/jobset"); -system("hydra-evaluator sample default"); +system("hydra-eval-jobset sample default"); $result = request_json({ uri => '/jobset/sample/default/evals' }); ok($result->code() == 200, "Can get evals of a jobset"); my $evals = decode_json($result->content())->{evals}; my $eval = $evals->[0]; ok($eval->{hasnewbuilds} == 1, "The first eval of a jobset has new builds"); -system("echo >> /run/jobset/default.nix; hydra-evaluator sample default"); +system("echo >> /run/jobset/default.nix; hydra-eval-jobset sample default"); my $evals = decode_json(request_json({ uri => '/jobset/sample/default/evals' })->content())->{evals}; ok($evals->[0]->{jobsetevalinputs}->{"my-src"}->{revision} != $evals->[1]->{jobsetevalinputs}->{"my-src"}->{revision}, "Changing a jobset source changes its revision"); diff --git a/tests/s3-backup-test.pl b/tests/s3-backup-test.pl index a81d2d22..372a9697 100644 --- a/tests/s3-backup-test.pl +++ b/tests/s3-backup-test.pl @@ -19,7 +19,7 @@ my $jobsetinput; $jobsetinput = $jobset->jobsetinputs->create({name => "jobs", type => "path"}); $jobsetinput->jobsetinputalts->create({altnr => 0, value => getcwd . "/jobs"}); -system("hydra-evaluator " . $jobset->project->name . " " . $jobset->name); +system("hydra-eval-jobset " . $jobset->project->name . " " . $jobset->name); my $successful_hash; foreach my $build ($jobset->builds->search({finished => 0})) {