forked from lix-project/hydra
348 lines
11 KiB
Perl
Executable file
348 lines
11 KiB
Perl
Executable file
#! /run/current-system/sw/bin/perl
|
||
|
||
use strict;
|
||
use utf8;
|
||
use Hydra::Schema;
|
||
use Hydra::Plugin;
|
||
use Hydra::Helper::Nix;
|
||
use Hydra::Helper::AddBuilds;
|
||
use Hydra::Helper::Email;
|
||
use Hydra::Model::DB;
|
||
use Digest::SHA qw(sha256_hex);
|
||
use Config::General;
|
||
use Data::Dump qw(dump);
|
||
use Try::Tiny;
|
||
|
||
STDOUT->autoflush();
|
||
STDERR->autoflush(1);
|
||
binmode STDERR, ":encoding(utf8)";
|
||
|
||
my $db = Hydra::Model::DB->new();
|
||
my $config = getHydraConfig();
|
||
|
||
my $plugins = [Hydra::Plugin->instantiate(db => $db, config => $config)];
|
||
|
||
# Don't check a jobset more than once every five minutes.
|
||
my $minCheckInterval = 5 * 60;
|
||
|
||
my $dryRun = defined $ENV{'HYDRA_DRY_RUN'};
|
||
|
||
|
||
sub fetchInputs {
|
||
my ($project, $jobset, $inputInfo) = @_;
|
||
foreach my $input ($jobset->jobsetinputs->all) {
|
||
foreach my $alt ($input->jobsetinputalts->all) {
|
||
push @{$$inputInfo{$input->name}}, $_
|
||
foreach fetchInput($plugins, $db, $project, $jobset, $input->name, $input->type, $alt->value, $input->emailresponsible);
|
||
}
|
||
}
|
||
}
|
||
|
||
|
||
sub setJobsetError {
|
||
my ($jobset, $errorMsg) = @_;
|
||
my $prevError = $jobset->errormsg;
|
||
|
||
eval {
|
||
txn_do($db, sub {
|
||
$jobset->update({ errormsg => $errorMsg, errortime => time, fetcherrormsg => undef });
|
||
});
|
||
};
|
||
if (defined $errorMsg && $errorMsg ne ($prevError // "") || $ENV{'HYDRA_MAIL_TEST'}) {
|
||
sendJobsetErrorNotification($jobset, $errorMsg);
|
||
}
|
||
}
|
||
|
||
|
||
sub sendJobsetErrorNotification() {
|
||
my ($jobset, $errorMsg) = @_;
|
||
|
||
chomp $errorMsg;
|
||
|
||
return if $jobset->project->owner->emailonerror == 0;
|
||
return if $errorMsg eq "";
|
||
|
||
my $projectName = $jobset->project->name;
|
||
my $jobsetName = $jobset->name;
|
||
my $body = "Hi,\n"
|
||
. "\n"
|
||
. "This is to let you know that evaluation of the Hydra jobset ‘$projectName:$jobsetName’\n"
|
||
. "resulted in the following error:\n"
|
||
. "\n"
|
||
. "$errorMsg"
|
||
. "\n"
|
||
. "Regards,\n\nThe Hydra build daemon.\n";
|
||
|
||
try {
|
||
sendEmail(
|
||
$config,
|
||
$jobset->project->owner->emailaddress,
|
||
"Hydra $projectName:$jobsetName evaluation error",
|
||
$body,
|
||
[ 'X-Hydra-Project' => $projectName
|
||
, 'X-Hydra-Jobset' => $jobsetName
|
||
]);
|
||
} catch {
|
||
warn "error sending email: $_\n";
|
||
};
|
||
}
|
||
|
||
|
||
sub permute {
|
||
my @list = @_;
|
||
for (my $n = scalar @list - 1; $n > 0; $n--) {
|
||
my $k = int(rand($n + 1)); # 0 <= $k <= $n
|
||
@list[$n, $k] = @list[$k, $n];
|
||
}
|
||
return @list;
|
||
}
|
||
|
||
|
||
sub checkJobsetWrapped {
|
||
my ($jobset) = @_;
|
||
my $project = $jobset->project;
|
||
my $inputInfo = {};
|
||
my $exprType = $jobset->nixexprpath =~ /.scm$/ ? "guile" : "nix";
|
||
|
||
# Fetch all values for all inputs.
|
||
my $checkoutStart = time;
|
||
eval {
|
||
fetchInputs($project, $jobset, $inputInfo);
|
||
};
|
||
if ($@) {
|
||
my $msg = $@;
|
||
print STDERR $msg;
|
||
txn_do($db, sub {
|
||
$jobset->update({ lastcheckedtime => time, fetcherrormsg => $msg }) if !$dryRun;
|
||
});
|
||
return;
|
||
}
|
||
my $checkoutStop = time;
|
||
|
||
# Hash the arguments to hydra-eval-jobs and check the
|
||
# JobsetInputHashes to see if the previous evaluation had the same
|
||
# inputs. If so, bail out.
|
||
my @args = ($jobset->nixexprinput, $jobset->nixexprpath, inputsToArgs($inputInfo, $exprType));
|
||
my $argsHash = sha256_hex("@args");
|
||
my $prevEval = getPrevJobsetEval($db, $jobset, 0);
|
||
if (defined $prevEval && $prevEval->hash eq $argsHash && !$dryRun) {
|
||
print STDERR " jobset is unchanged, skipping\n";
|
||
txn_do($db, sub {
|
||
$jobset->update({ lastcheckedtime => time, fetcherrormsg => undef });
|
||
});
|
||
return;
|
||
}
|
||
|
||
# Evaluate the job expression.
|
||
my $evalStart = time;
|
||
my ($jobs, $nixExprInput) = evalJobs($inputInfo, $exprType, $jobset->nixexprinput, $jobset->nixexprpath);
|
||
my $evalStop = time;
|
||
|
||
if ($dryRun) {
|
||
foreach my $name (keys %{$jobs}) {
|
||
my $job = $jobs->{$name};
|
||
if (defined $job->{drvPath}) {
|
||
print STDERR "good job $name: $job->{drvPath}\n";
|
||
} else {
|
||
print STDERR "failed job $name: $job->{error}\n";
|
||
}
|
||
}
|
||
return;
|
||
}
|
||
|
||
$jobs->{$_}->{jobName} = $_ for keys %{$jobs};
|
||
|
||
my $jobOutPathMap = {};
|
||
|
||
txn_do($db, sub {
|
||
|
||
my $prevEval = getPrevJobsetEval($db, $jobset, 1);
|
||
|
||
# Clear the "current" flag on all builds. Since we're in a
|
||
# transaction this will only become visible after the new
|
||
# current builds have been added.
|
||
$jobset->builds->search({iscurrent => 1})->update({iscurrent => 0});
|
||
|
||
# Schedule each successfully evaluated job.
|
||
my %buildMap;
|
||
foreach my $job (permute(values %{$jobs})) {
|
||
next if defined $job->{error};
|
||
#print STDERR "considering job " . $project->name, ":", $jobset->name, ":", $job->{jobName} . "\n";
|
||
checkBuild($db, $jobset, $inputInfo, $nixExprInput, $job, \%buildMap, $prevEval, $jobOutPathMap, $plugins);
|
||
}
|
||
|
||
# Have any builds been added or removed since last time?
|
||
my $jobsetChanged =
|
||
(scalar(grep { $_->{new} } values(%buildMap)) > 0)
|
||
|| (defined $prevEval && $prevEval->jobsetevalmembers->count != scalar(keys %buildMap));
|
||
|
||
my $ev = $jobset->jobsetevals->create(
|
||
{ hash => $argsHash
|
||
, timestamp => time
|
||
, checkouttime => abs($checkoutStop - $checkoutStart)
|
||
, evaltime => abs($evalStop - $evalStart)
|
||
, hasnewbuilds => $jobsetChanged ? 1 : 0
|
||
, nrbuilds => $jobsetChanged ? scalar(keys %buildMap) : undef
|
||
});
|
||
|
||
if ($jobsetChanged) {
|
||
# Create JobsetEvalMembers mappings.
|
||
while (my ($id, $x) = each %buildMap) {
|
||
$ev->jobsetevalmembers->create({ build => $id, isnew => $x->{new} });
|
||
}
|
||
|
||
# Create AggregateConstituents mappings. Since there can
|
||
# be jobs that alias each other, if there are multiple
|
||
# builds for the same derivation, pick the one with the
|
||
# shortest name.
|
||
my %drvPathToId;
|
||
while (my ($id, $x) = each %buildMap) {
|
||
my $y = $drvPathToId{$x->{drvPath}};
|
||
if (defined $y) {
|
||
next if length $x->{jobName} > length $y->{jobName};
|
||
next if length $x->{jobName} == length $y->{jobName} && $x->{jobName} ge $y->{jobName};
|
||
}
|
||
$drvPathToId{$x->{drvPath}} = $x;
|
||
}
|
||
|
||
foreach my $job (values %{$jobs}) {
|
||
next unless $job->{constituents};
|
||
my $x = $drvPathToId{$job->{drvPath}} or die;
|
||
foreach my $drvPath (split / /, $job->{constituents}) {
|
||
my $constituent = $drvPathToId{$drvPath};
|
||
if (defined $constituent) {
|
||
$db->resultset('AggregateConstituents')->update_or_create({aggregate => $x->{id}, constituent => $constituent->{id}});
|
||
} else {
|
||
warn "aggregate job ‘$job->{jobName}’ has a constituent ‘$drvPath’ that doesn't correspond to a Hydra build\n";
|
||
}
|
||
}
|
||
}
|
||
|
||
foreach my $name (keys %{$inputInfo}) {
|
||
for (my $n = 0; $n < scalar(@{$inputInfo->{$name}}); $n++) {
|
||
my $input = $inputInfo->{$name}->[$n];
|
||
$ev->jobsetevalinputs->create(
|
||
{ name => $name
|
||
, altnr => $n
|
||
, type => $input->{type}
|
||
, uri => $input->{uri}
|
||
, revision => $input->{revision}
|
||
, value => $input->{value}
|
||
, dependency => $input->{id}
|
||
, path => $input->{storePath} || "" # !!! temporary hack
|
||
, sha256hash => $input->{sha256hash}
|
||
});
|
||
}
|
||
}
|
||
|
||
print STDERR " created new eval ", $ev->id, "\n";
|
||
$ev->builds->update({iscurrent => 1});
|
||
|
||
$db->storage->dbh->do("notify builds_added");
|
||
} else {
|
||
print STDERR " created cached eval ", $ev->id, "\n";
|
||
$prevEval->builds->update({iscurrent => 1}) if defined $prevEval;
|
||
}
|
||
|
||
# If this is a one-shot jobset, disable it now.
|
||
$jobset->update({ enabled => 0 }) if $jobset->enabled == 2;
|
||
|
||
$jobset->update({ lastcheckedtime => time });
|
||
});
|
||
|
||
# Store the error messages for jobs that failed to evaluate.
|
||
my $msg = "";
|
||
foreach my $job (values %{$jobs}) {
|
||
next unless defined $job->{error};
|
||
$msg .=
|
||
($job->{jobName} ne "" ? "in job ‘$job->{jobName}’" : "at top-level") .
|
||
":\n" . $job->{error} . "\n\n";
|
||
}
|
||
setJobsetError($jobset, $msg);
|
||
}
|
||
|
||
|
||
sub checkJobset {
|
||
my ($jobset) = @_;
|
||
|
||
print STDERR "considering jobset ", $jobset->project->name, ":", $jobset->name,
|
||
$jobset->lastcheckedtime
|
||
? " (last checked " . (time() - $jobset->lastcheckedtime) . "s ago)\n"
|
||
: " (never checked)\n";
|
||
|
||
my $triggerTime = $jobset->triggertime;
|
||
|
||
eval {
|
||
checkJobsetWrapped($jobset);
|
||
};
|
||
|
||
my $failed = 0;
|
||
if ($@) {
|
||
my $msg = $@;
|
||
print STDERR $msg;
|
||
txn_do($db, sub {
|
||
$jobset->update({lastcheckedtime => time});
|
||
setJobsetError($jobset, $msg);
|
||
}) if !$dryRun;
|
||
$failed = 1;
|
||
}
|
||
|
||
if (defined $triggerTime) {
|
||
txn_do($db, sub {
|
||
# Only clear the trigger time if the jobset hasn't been
|
||
# triggered in the meantime. In that case, we need to
|
||
# evaluate again.
|
||
my $new = $jobset->get_from_storage();
|
||
$jobset->update({ triggertime => undef })
|
||
if $new->triggertime == $triggerTime;
|
||
}) if !$dryRun;
|
||
}
|
||
|
||
return $failed;
|
||
}
|
||
|
||
|
||
sub checkSomeJobset {
|
||
# If any jobset has been triggered by a push, check it.
|
||
my ($jobset) = $db->resultset('Jobsets')->search(
|
||
{ 'triggertime' => { '!=', undef },
|
||
, -or => [ 'lastcheckedtime' => undef, 'lastcheckedtime' => { '<', time() - $minCheckInterval } ] },
|
||
{ join => 'project', order_by => [ 'triggertime' ], rows => 1 });
|
||
|
||
# Otherwise, check the jobset that hasn't been checked for the
|
||
# longest time (but don't check more often than the jobset's
|
||
# minimal check interval).
|
||
($jobset) = $db->resultset('Jobsets')->search(
|
||
{ 'project.enabled' => 1, 'me.enabled' => { '!=' => 0 },
|
||
, 'checkinterval' => { '!=', 0 }
|
||
, -or => [ 'lastcheckedtime' => undef, 'lastcheckedtime' => { '<', \ (time() . " - me.checkinterval") } ] },
|
||
{ join => 'project', order_by => [ 'lastcheckedtime nulls first' ], rows => 1 })
|
||
unless defined $jobset;
|
||
|
||
return 0 unless defined $jobset;
|
||
|
||
return system($0, $jobset->project->name, $jobset->name) == 0;
|
||
}
|
||
|
||
|
||
if (scalar @ARGV == 2) {
|
||
my $projectName = $ARGV[0];
|
||
my $jobsetName = $ARGV[1];
|
||
my $jobset = $db->resultset('Jobsets')->find($projectName, $jobsetName) or die "$0: specified jobset does not exist\n";
|
||
exit checkJobset($jobset);
|
||
}
|
||
|
||
|
||
while (1) {
|
||
eval {
|
||
if (checkSomeJobset) {
|
||
# Just so we don't go completely crazy if lastcheckedtime
|
||
# isn't updated properly.
|
||
sleep 1;
|
||
} else {
|
||
# print STDERR "sleeping...\n";
|
||
sleep 30;
|
||
}
|
||
};
|
||
if ($@) { print STDERR "$@"; }
|
||
}
|