hydra/src/script/hydra_scheduler.pl

535 lines
17 KiB
Perl
Raw Normal View History

2008-11-28 16:13:06 +00:00
#! /var/run/current-system/sw/bin/perl -w
use strict;
2009-03-09 13:04:46 +00:00
use feature 'switch';
use XML::Simple;
2008-11-25 11:09:15 +00:00
use Hydra::Schema;
2008-11-28 14:36:04 +00:00
use Hydra::Helper::Nix;
use IPC::Run;
use POSIX qw(strftime);
2009-04-22 22:59:54 +00:00
STDOUT->autoflush();
2008-11-28 14:36:04 +00:00
my $db = openHydraDB;
2008-11-05 04:52:52 +00:00
sub captureStdoutStderr {
my $stdin = ""; my $stdout; my $stderr;
my $res = IPC::Run::run(\@_, \$stdin, \$stdout, \$stderr);
return ($res, $stdout, $stderr);
}
2008-11-12 23:14:57 +00:00
sub getStorePathHash {
my ($storePath) = @_;
my $hash = `nix-store --query --hash $storePath`
or die "cannot get hash of $storePath";
chomp $hash;
die unless $hash =~ /^sha256:(.*)$/;
$hash = $1;
$hash = `nix-hash --to-base16 --type sha256 $hash`
or die "cannot convert hash";
chomp $hash;
return $hash;
}
sub parseJobName {
# Parse a job specification of the form `<project>:<jobset>:<job>
# [attrs]'. The project, jobset and attrs may be omitted. The
# attrs have the form `name = "value"'.
my ($s) = @_;
our $key;
our %attrs = ();
# hm, maybe I should stop programming Perl before it's too late...
$s =~ / ^ (?: (?: ([\w\-]+) : )? ([\w\-]+) : )? ([\w\-]+) \s*
(\[ \s* (
([\w]+) (?{ $key = $^N; }) \s* = \s* \"
([\w\-]+) (?{ $attrs{$key} = $^N; }) \"
\s* )* \])? $
/x
or die "invalid job specifier `$s'";
return ($1, $2, $3, \%attrs);
}
sub attrsToSQL {
my ($attrs, $id) = @_;
my $query = "1 = 1";
foreach my $name (keys %{$attrs}) {
my $value = $attrs->{$name};
$name =~ /^[\w\-]+$/ or die;
$value =~ /^[\w\-]+$/ or die;
# !!! Yes, this is horribly injection-prone... (though
# name/value are filtered above). Should use SQL::Abstract,
# but it can't deal with subqueries. At least we should use
# placeholders.
$query .= " and exists (select 1 from buildinputs where build = $id and name = '$name' and value = '$value')";
}
return $query;
}
2009-03-09 13:04:46 +00:00
sub fetchInputAlt {
my ($project, $jobset, $input, $alt) = @_;
2008-11-04 18:23:28 +00:00
my $type = $input->type;
if ($type eq "path") {
2008-11-17 13:36:58 +00:00
my $uri = $alt->value;
my $timestamp = time;
my $sha256;
my $storePath;
# Some simple caching: don't check a path more than once every N seconds.
(my $cachedInput) = $db->resultset('CachedPathInputs')->search(
{srcpath => $uri, lastseen => {">", $timestamp - 60}},
{rows => 1, order_by => "lastseen DESC"});
if (defined $cachedInput && isValidPath($cachedInput->storepath)) {
$storePath = $cachedInput->storepath;
$sha256 = $cachedInput->sha256hash;
$timestamp = $cachedInput->timestamp;
} else {
print "copying input ", $input->name, " from $uri\n";
$storePath = `nix-store --add "$uri"`
or die "cannot copy path $uri to the Nix store";
chomp $storePath;
$sha256 = getStorePathHash $storePath;
($cachedInput) = $db->resultset('CachedPathInputs')->search(
{srcpath => $uri, sha256hash => $sha256});
# Path inputs don't have a natural notion of a "revision",
# so we simulate it by using the timestamp that we first
# saw this path have this SHA-256 hash. So if the
# contents of the path changes, we get a new "revision",
# but if it doesn't change (or changes back), we don't get
# a new "revision".
if (!defined $cachedInput) {
txn_do($db, sub {
$db->resultset('CachedPathInputs')->create(
{ srcpath => $uri
, timestamp => $timestamp
, lastseen => $timestamp
, sha256hash => $sha256
, storepath => $storePath
});
});
} else {
$timestamp = $cachedInput->timestamp;
txn_do($db, sub {
2009-03-09 16:22:41 +00:00
$cachedInput->update({lastseen => time});
});
}
}
2009-03-09 13:04:46 +00:00
return
2008-11-12 23:14:57 +00:00
{ type => $type
, uri => $uri
, storePath => $storePath
, sha256hash => $sha256
, revision => strftime "%Y%m%d%H%M%S", gmtime($timestamp)
2008-11-12 23:14:57 +00:00
};
2008-11-04 18:23:28 +00:00
}
2008-11-25 18:13:55 +00:00
elsif ($type eq "svn") {
my $uri = $alt->value;
my $sha256;
my $storePath;
# First figure out the last-modified revision of the URI.
my @cmd = (["svn", "ls", "-v", "--depth", "empty", $uri],
"|", ["sed", 's/^ *\([0-9]*\).*/\1/']);
my $stdout; my $stderr;
die "cannot get head revision of Subversion repository at `$uri':\n$stderr"
unless IPC::Run::run(@cmd, \$stdout, \$stderr);
my $revision = $stdout; chomp $revision;
die unless $revision =~ /^\d+$/;
(my $cachedInput) = $db->resultset('CachedSubversionInputs')->search(
2008-11-25 18:13:55 +00:00
{uri => $uri, revision => $revision});
if (defined $cachedInput && isValidPath($cachedInput->storepath)) {
$storePath = $cachedInput->storepath;
$sha256 = $cachedInput->sha256hash;
} else {
# Then download this revision into the store.
print "checking out Subversion input ", $input->name, " from $uri revision $revision\n";
$ENV{"NIX_HASH_ALGO"} = "sha256";
$ENV{"PRINT_PATH"} = "1";
(my $res, $stdout, $stderr) = captureStdoutStderr(
"nix-prefetch-svn", $uri, $revision);
die "cannot check out Subversion repository `$uri':\n$stderr" unless $res;
($sha256, $storePath) = split ' ', $stdout;
txn_do($db, sub {
$db->resultset('CachedSubversionInputs')->create(
2008-11-25 18:13:55 +00:00
{ uri => $uri
, revision => $revision
, sha256hash => $sha256
, storepath => $storePath
});
});
}
2009-03-09 13:04:46 +00:00
return
2008-11-25 18:13:55 +00:00
{ type => $type
, uri => $uri
, storePath => $storePath
, sha256hash => $sha256
, revision => $revision
};
}
elsif ($type eq "build") {
my ($projectName, $jobsetName, $jobName, $attrs) = parseJobName($alt->value);
$projectName ||= $project->name;
$jobsetName ||= $jobset->name;
# Pick the most recent successful build of the specified job.
(my $prevBuild) = $db->resultset('Builds')->search(
{ finished => 1, project => $projectName, jobset => $jobsetName
, job => $jobName, buildStatus => 0 },
{ join => 'resultInfo', order_by => "me.id DESC", rows => 1
, where => \ attrsToSQL($attrs, "me.id") });
if (!defined $prevBuild || !isValidPath($prevBuild->outpath)) {
print STDERR "input `", $input->name, "': no previous build available\n";
return undef;
}
#print STDERR "input `", $input->name, "': using build ", $prevBuild->id, "\n";
my $pkgNameRE = "(?:(?:[A-Za-z0-9]|(?:-[^0-9]))+)";
my $versionRE = "(?:[A-Za-z0-9\.\-]+)";
my $relName = ($prevBuild->resultInfo->releasename or $prevBuild->nixname);
my $version = $2 if $relName =~ /^($pkgNameRE)-($versionRE)$/;
return
{ type => "build"
, storePath => $prevBuild->outpath
, id => $prevBuild->id
, version => $version
};
}
2008-11-25 18:13:55 +00:00
2008-11-06 18:26:29 +00:00
elsif ($type eq "string") {
die unless defined $alt->value;
2009-03-09 13:04:46 +00:00
return {type => $type, value => $alt->value};
2008-11-06 18:26:29 +00:00
}
2008-11-25 18:34:24 +00:00
elsif ($type eq "boolean") {
die unless defined $alt->value && ($alt->value eq "true" || $alt->value eq "false");
2009-03-09 13:04:46 +00:00
return {type => $type, value => $alt->value};
2008-11-25 18:34:24 +00:00
}
2008-11-04 18:23:28 +00:00
else {
2008-11-25 18:13:55 +00:00
die "input `" . $input->name . "' has unknown type `$type'";
2008-11-04 18:23:28 +00:00
}
}
2009-03-09 13:04:46 +00:00
sub fetchInputs {
my ($project, $jobset, $inputInfo) = @_;
2009-03-09 13:04:46 +00:00
foreach my $input ($jobset->jobsetinputs->all) {
foreach my $alt ($input->jobsetinputalts->all) {
my $info = fetchInputAlt($project, $jobset, $input, $alt);
push @{$$inputInfo{$input->name}}, $info if defined $info;
2009-03-09 13:04:46 +00:00
}
}
}
2008-11-07 14:51:44 +00:00
sub checkJob {
my ($project, $jobset, $inputInfo, $nixExprInput, $job, $currentBuilds) = @_;
2008-11-07 14:51:44 +00:00
2009-03-09 13:04:46 +00:00
my $jobName = $job->{jobName};
my $drvPath = $job->{drvPath};
2008-11-07 14:51:44 +00:00
my $outPath = $job->{outPath};
my $priority = 100;
2009-03-09 13:04:46 +00:00
$priority = int($job->{schedulingPriority})
if $job->{schedulingPriority} =~ /^\d+$/;
txn_do($db, sub {
2009-03-13 14:49:25 +00:00
# Mark this job as active in the database.
my $jobInDB = $jobset->jobs->update_or_create(
{ name => $jobName
, active => 1
, lastevaltime => time
});
$jobInDB->update({firstevaltime => time})
unless defined $jobInDB->firstevaltime;
# Have we already done this build (in this job)? Don't do it
# again unless it has been garbage-collected. The latest
# builds for each platforms are GC roots, so they shouldn't be
# GCed. However, if a job has reverted to a previous state,
# it's possible that a GCed build becomes current again. In
# that case we have to rebuild it to ensure that it appears in
# channels etc.
my @previousBuilds = $jobInDB->builds->search({outPath => $outPath}, {order_by => "id"});
if (scalar(@previousBuilds) > 0) {
foreach my $build (@previousBuilds) {
if (!$build->finished) {
print "already scheduled as build ", $build->id, "\n";
$currentBuilds->{$build->id} = 1;
return;
}
}
if (isValidPath($outPath)) {
print "already done as build ", $previousBuilds[0]->id, "\n";
# Mark the previous build as "current" so that it will
# appear in the "latest" channel for this
# project/jobset/job.
$previousBuilds[0]->update({iscurrent => 1});
$currentBuilds->{$previousBuilds[0]->id} = 1;
return;
}
print "already done as build ", $previousBuilds[0]->id,
"; rebuilding because it was garbage-collected\n";
2008-11-10 13:33:12 +00:00
}
2009-03-13 14:49:25 +00:00
# Nope, so add it.
my $build = $jobInDB->builds->create(
2008-11-11 12:54:37 +00:00
{ finished => 0
, timestamp => time()
2009-03-09 13:04:46 +00:00
, description => $job->{description}
, longdescription => $job->{longDescription}
, license => $job->{license}
, homepage => $job->{homepage}
2009-07-07 14:33:51 +00:00
, maintainers => $job->{maintainers}
2009-03-09 13:04:46 +00:00
, nixname => $job->{nixName}
2008-11-10 13:33:12 +00:00
, drvpath => $drvPath
, outpath => $outPath
, system => $job->{system}
, iscurrent => 1
2008-11-10 13:33:12 +00:00
});
print "added to queue as build ", $build->id, "\n";
$currentBuilds->{$build->id} = 1;
$build->create_related('buildschedulinginfo',
2009-03-13 14:49:25 +00:00
{ priority => $priority
2008-11-11 12:54:37 +00:00
, busy => 0
, locker => ""
});
my %inputs;
$inputs{$jobset->nixexprinput} = $nixExprInput;
2009-03-09 13:04:46 +00:00
foreach my $arg (@{$job->{arg}}) {
$inputs{$arg->{name}} = $inputInfo->{$arg->{name}}->[$arg->{altnr}]
|| die "invalid input";
}
foreach my $name (keys %inputs) {
my $input = $inputs{$name};
2009-03-13 14:49:25 +00:00
$build->buildinputs_builds->create(
{ name => $name
2008-11-10 13:33:12 +00:00
, type => $input->{type}
, uri => $input->{uri}
, revision => $input->{revision}
2008-11-10 13:33:12 +00:00
, value => $input->{value}
, dependency => $input->{id}
, path => $input->{storePath} || "" # !!! temporary hack
2008-11-12 23:14:57 +00:00
, sha256hash => $input->{sha256hash}
2008-11-10 13:33:12 +00:00
});
}
});
2008-11-07 14:51:44 +00:00
};
sub setJobsetError {
my ($jobset, $errorMsg) = @_;
eval {
txn_do($db, sub {
2009-03-09 16:22:41 +00:00
$jobset->update({errormsg => $errorMsg, errortime => time});
});
};
}
2009-03-09 13:04:46 +00:00
sub inputsToArgs {
my ($inputInfo) = @_;
my @res = ();
foreach my $input (keys %{$inputInfo}) {
foreach my $alt (@{$inputInfo->{$input}}) {
given ($alt->{type}) {
when ("string") {
push @res, "--argstr", $input, $alt->{value};
}
when ("boolean") {
push @res, "--arg", $input, $alt->{value};
}
when (["svn", "path", "build"]) {
2009-03-09 13:04:46 +00:00
push @res, "--arg", $input, (
"{ outPath = builtins.storePath " . $alt->{storePath} . "" .
(defined $alt->{revision} ? "; rev = \"" . $alt->{revision} . "\"" : "") .
(defined $alt->{version} ? "; version = \"" . $alt->{version} . "\"" : "") .
2009-03-09 13:04:46 +00:00
";}"
);
}
2008-11-07 14:51:44 +00:00
}
}
}
2009-03-09 13:04:46 +00:00
return @res;
2008-11-07 14:51:44 +00:00
}
sub permute {
my @list = @_;
for (my $n = scalar @list - 1; $n > 0; $n--) {
my $k = int(rand($n + 1)); # 0 <= $k <= $n
@list[$n, $k] = @list[$k, $n];
}
return @list;
}
sub checkJobset {
2008-11-07 14:51:44 +00:00
my ($project, $jobset) = @_;
my $inputInfo = {};
2008-11-26 13:39:15 +00:00
2009-03-09 13:04:46 +00:00
# Fetch all values for all inputs.
fetchInputs($project, $jobset, $inputInfo);
2008-11-07 14:51:44 +00:00
2009-03-09 13:04:46 +00:00
# Evaluate the job expression.
my $nixExprInput = $inputInfo->{$jobset->nixexprinput}->[0]
2009-03-09 13:04:46 +00:00
or die "cannot find the input containing the job expression";
die "multiple alternatives for the input containing the Nix expression are not supported"
if scalar @{$inputInfo->{$jobset->nixexprinput}} != 1;
my $nixExprPath = $nixExprInput->{storePath} . "/" . $jobset->nixexprpath;
(my $res, my $jobsXml, my $stderr) = captureStdoutStderr(
2009-03-15 11:56:11 +00:00
"hydra_eval_jobs", $nixExprPath, "--gc-roots-dir", getGCRootsDir,
inputsToArgs($inputInfo));
die "cannot evaluate the Nix expression containing the jobs:\n$stderr" unless $res;
2008-11-04 18:23:28 +00:00
2009-03-15 11:56:11 +00:00
print STDERR "$stderr";
2008-11-04 18:23:28 +00:00
my $jobs = XMLin($jobsXml,
2009-03-09 13:04:46 +00:00
ForceArray => ['error', 'job', 'arg'],
KeyAttr => [],
SuppressEmpty => '')
2008-11-04 18:23:28 +00:00
or die "cannot parse XML output";
2009-03-09 13:04:46 +00:00
# Schedule each successfully evaluated job.
my %currentBuilds;
foreach my $job (permute @{$jobs->{job}}) {
2009-03-13 14:49:25 +00:00
next if $job->{jobName} eq "";
2009-03-09 13:04:46 +00:00
print "considering job " . $job->{jobName} . "\n";
checkJob($project, $jobset, $inputInfo, $nixExprInput, $job, \%currentBuilds);
2008-11-06 18:26:29 +00:00
}
2009-03-09 15:16:11 +00:00
txn_do($db, sub {
# Mark all existing jobs that we haven't seen as inactive.
my %jobNames;
$jobNames{$_->{jobName}}++ foreach @{$jobs->{job}};
my %failedJobNames;
push @{$failedJobNames{$_->{location}}}, $_->{msg} foreach @{$jobs->{error}};
$jobset->update({lastcheckedtime => time});
2009-03-13 14:49:25 +00:00
foreach my $jobInDB ($jobset->jobs->all) {
$jobInDB->update({active => $jobNames{$jobInDB->name} || $failedJobNames{$jobInDB->name} ? 1 : 0});
if ($failedJobNames{$jobInDB->name}) {
$jobInDB->update({errormsg => join '\n', @{$failedJobNames{$jobInDB->name}}});
} else {
$jobInDB->update({errormsg => undef});
}
}
# Clear the "current" flag on all builds that are no longer
# current.
foreach my $build ($jobset->builds->search({iscurrent => 1})) {
print "current is ", $build->id, "\n";
$build->update({iscurrent => 0}) unless $currentBuilds{$build->id};
}
2009-03-13 14:49:25 +00:00
});
2009-03-09 15:16:11 +00:00
# Store the errors messages for jobs that failed to evaluate.
my $msg = "";
foreach my $error (@{$jobs->{error}}) {
my $bindings = "";
foreach my $arg (@{$error->{arg}}) {
my $input = $inputInfo->{$arg->{name}}->[$arg->{altnr}] or die "invalid input";
$bindings .= ", " if $bindings ne "";
$bindings .= $arg->{name} . " = ";
given ($input->{type}) {
when ("string") { $bindings .= "\"" . $input->{value} . "\""; }
when ("boolean") { $bindings .= $input->{value}; }
default { $bindings .= "..."; }
}
}
$msg .= "at `" . $error->{location} . "' [$bindings]:\n" . $error->{msg} . "\n\n";
2009-03-09 15:16:11 +00:00
}
setJobsetError($jobset, $msg);
2008-11-04 18:23:28 +00:00
}
sub checkJobsetWrapped {
my ($project, $jobset) = @_;
print "considering jobset ", $jobset->name, " in ", $project->name, "\n";
eval {
checkJobset($project, $jobset);
};
if ($@) {
my $msg = $@;
print "error evaluating jobset ", $jobset->name, ": $msg";
txn_do($db, sub {
$jobset->update({lastcheckedtime => time});
setJobsetError($jobset, $msg);
});
}
}
2008-11-04 18:23:28 +00:00
sub checkJobs {
2008-11-18 12:48:58 +00:00
foreach my $project ($db->resultset('Projects')->search({enabled => 1})) {
print "considering project ", $project->name, "\n";
checkJobsetWrapped($project, $_) foreach $project->jobsets->all;
2008-11-04 18:23:28 +00:00
}
}
2009-03-05 12:32:14 +00:00
# For testing: evaluate a single jobset, then exit.
if (scalar @ARGV == 2) {
my $projectName = $ARGV[0];
my $jobsetName = $ARGV[1];
my $jobset = $db->resultset('Jobsets')->find($projectName, $jobsetName) or die;
checkJobsetWrapped($jobset->project, $jobset);
2009-03-05 12:32:14 +00:00
exit 0;
}
while (1) {
2009-03-23 01:13:37 +00:00
eval {
checkJobs;
};
if ($@) { print "$@"; }
print "sleeping...\n";
sleep 30;
}