hydra-eval-jobset: incrementally ingest eval results

nix-eval-jobs streams output, unlike hydra-eval-jobs. Now that we've
migrated, we can use this to:

1. Use less RAM by avoiding buffering a whole eval's worth of metadata
   into a Perl string and an array of JSON objects.
2. Make evals latency a bit lower by allowing the queue runner to start
   ingesting builds faster.
This commit is contained in:
Pierre Bourdon 2024-07-17 12:05:41 +02:00
parent 370a4bf138
commit b0e9b4b2f9
Signed by untrusted user: delroth
GPG key ID: 6FB80DCD84DA0F1C

View file

@ -17,6 +17,7 @@ use Hydra::Helper::Nix;
use Hydra::Model::DB; use Hydra::Model::DB;
use Hydra::Plugin; use Hydra::Plugin;
use Hydra::Schema; use Hydra::Schema;
use IPC::Run;
use JSON::MaybeXS; use JSON::MaybeXS;
use Net::Statsd; use Net::Statsd;
use Nix::Store; use Nix::Store;
@ -383,23 +384,33 @@ sub evalJobs {
print STDERR "evaluator: @escaped\n"; print STDERR "evaluator: @escaped\n";
} }
(my $res, my $jobsJSONLines, my $stderr) = captureStdoutStderr(21600, @cmd); my $h = IPC::Run::start \@cmd,
die "nix-eval-jobs returned " . ($res & 127 ? "signal $res" : "exit code " . ($res >> 8)) '>', IPC::Run::new_chunker, \my $out,
. ":\n" . ($stderr ? decode("utf-8", $stderr) : "(no output)\n") '2>', \my $err;
if $res;
print STDERR "$stderr"; return sub {
while (1) {
$h->pump;
if (!defined $out && !defined $err) {
$h->finish;
if ($?) {
die "nix-eval-jobs returned " . ($? & 127 ? "signal $?" : "exit code " . ($? >> 8)) . "\n";
}
return;
}
# XXX: take advantage of nix-eval-jobs's streaming instead of parsing everything in one block at if (defined $err) {
# the end. print STDERR "$err";
my @jobs; undef $err;
foreach my $line (split(/\n/, $jobsJSONLines)) { }
last if $line eq "";
push(@jobs, decode_json($line)); if (defined $out && $out ne '') {
my $job = decode_json($out);
undef $out;
return $job;
}
}
}; };
return @jobs;
} }
@ -716,17 +727,11 @@ sub checkJobsetWrapped {
# Evaluate the job expression. # Evaluate the job expression.
my $evalStart = clock_gettime(CLOCK_MONOTONIC); my $evalStart = clock_gettime(CLOCK_MONOTONIC);
my @jobs = evalJobs($project->name . ":" . $jobset->name, $inputInfo, $jobset->nixexprinput, $jobset->nixexprpath, $flakeRef); my $evalStop;
my $evalStop = clock_gettime(CLOCK_MONOTONIC); my $jobsIter = evalJobs($project->name . ":" . $jobset->name, $inputInfo, $jobset->nixexprinput, $jobset->nixexprpath, $flakeRef);
if ($jobsetsJobset) {
die "The .jobsets jobset must only have a single job named 'jobsets'"
unless (scalar @jobs) == 1 && $jobs[0]->{attr} eq "jobsets";
}
Net::Statsd::timing("hydra.evaluator.eval_time", int(($evalStop - $evalStart) * 1000));
if ($dryRun) { if ($dryRun) {
foreach my $job (@jobs) { while (defined(my $job = $jobsIter->())) {
my $name = $job->{attr}; my $name = $job->{attr};
if (defined $job->{drvPath}) { if (defined $job->{drvPath}) {
print STDERR "good job $name: $job->{drvPath}\n"; print STDERR "good job $name: $job->{drvPath}\n";
@ -737,31 +742,20 @@ sub checkJobsetWrapped {
return; return;
} }
my $jobOutPathMap = {};
my $jobsetChanged = 0;
my $dbStart = clock_gettime(CLOCK_MONOTONIC);
# Store the error messages for jobs that failed to evaluate. # Store the error messages for jobs that failed to evaluate.
my $evaluationErrorTime = time; my $evaluationErrorTime = time;
my $evaluationErrorMsg = ""; my $evaluationErrorMsg = "";
foreach my $job (@jobs) {
next unless defined $job->{error};
$evaluationErrorMsg .=
($job->{attr} ne "" ? "in job $job->{attr}" : "at top-level") .
":\n" . $job->{error} . "\n\n";
}
setJobsetError($jobset, $evaluationErrorMsg, $evaluationErrorTime);
my $evaluationErrorRecord = $db->resultset('EvaluationErrors')->create( my $evaluationErrorRecord = $db->resultset('EvaluationErrors')->create(
{ errormsg => $evaluationErrorMsg { errormsg => $evaluationErrorMsg
, errortime => $evaluationErrorTime , errortime => $evaluationErrorTime
} }
); );
my $jobOutPathMap = {};
my $jobsetChanged = 0;
my %buildMap; my %buildMap;
$db->txn_do(sub {
$db->txn_do(sub {
my $prevEval = getPrevJobsetEval($db, $jobset, 1); my $prevEval = getPrevJobsetEval($db, $jobset, 1);
# Clear the "current" flag on all builds. Since we're in a # Clear the "current" flag on all builds. Since we're in a
@ -774,7 +768,7 @@ sub checkJobsetWrapped {
, evaluationerror => $evaluationErrorRecord , evaluationerror => $evaluationErrorRecord
, timestamp => time , timestamp => time
, checkouttime => abs(int($checkoutStop - $checkoutStart)) , checkouttime => abs(int($checkoutStop - $checkoutStart))
, evaltime => abs(int($evalStop - $evalStart)) , evaltime => 0
, hasnewbuilds => 0 , hasnewbuilds => 0
, nrbuilds => 0 , nrbuilds => 0
, flake => $flakeRef , flake => $flakeRef
@ -782,11 +776,18 @@ sub checkJobsetWrapped {
, nixexprpath => $jobset->nixexprpath , nixexprpath => $jobset->nixexprpath
}); });
# Schedule each successfully evaluated job. while (defined(my $job = $jobsIter->())) {
foreach my $job (permute(@jobs)) { if ($jobsetsJobset) {
next if defined $job->{error}; die "The .jobsets jobset must only have a single job named 'jobsets'"
#print STDERR "considering job " . $project->name, ":", $jobset->name, ":", $job->{jobName} . "\n"; unless $job->{attr} eq "jobsets";
checkBuild($db, $jobset, $ev, $inputInfo, $job, \%buildMap, $prevEval, $jobOutPathMap, $plugins); }
$evaluationErrorMsg .=
($job->{attr} ne "" ? "in job $job->{attr}" : "at top-level") .
":\n" . $job->{error} . "\n\n" if defined $job->{error};
checkBuild($db, $jobset, $ev, $inputInfo, $job, \%buildMap, $prevEval, $jobOutPathMap, $plugins)
unless defined $job->{error};
} }
# Have any builds been added or removed since last time? # Have any builds been added or removed since last time?
@ -846,11 +847,15 @@ sub checkJobsetWrapped {
$jobset->update({ enabled => 0 }) if $jobset->enabled == 2; $jobset->update({ enabled => 0 }) if $jobset->enabled == 2;
$jobset->update({ lastcheckedtime => time, forceeval => undef }); $jobset->update({ lastcheckedtime => time, forceeval => undef });
$evaluationErrorRecord->update({ errormsg => $evaluationErrorMsg });
setJobsetError($jobset, $evaluationErrorMsg, $evaluationErrorTime);
$evalStop = clock_gettime(CLOCK_MONOTONIC);
$ev->update({ evaltime => abs(int($evalStop - $evalStart)) });
}); });
my $dbStop = clock_gettime(CLOCK_MONOTONIC); Net::Statsd::timing("hydra.evaluator.eval_time", int(($evalStop - $evalStart) * 1000));
Net::Statsd::timing("hydra.evaluator.db_time", int(($dbStop - $dbStart) * 1000));
Net::Statsd::increment("hydra.evaluator.evals"); Net::Statsd::increment("hydra.evaluator.evals");
Net::Statsd::increment("hydra.evaluator.cached_evals") unless $jobsetChanged; Net::Statsd::increment("hydra.evaluator.cached_evals") unless $jobsetChanged;
} }