#! /usr/bin/env perl use strict; use utf8; use Hydra::Plugin; use Hydra::Helper::Nix; use Hydra::Helper::AddBuilds; use IO::Select; use Getopt::Long; STDERR->autoflush(1); STDOUT->autoflush(1); binmode STDERR, ":encoding(utf8)"; my $queued_only; GetOptions( "queued-only" => \$queued_only ) or exit 1; my $config = getHydraConfig(); my $db = Hydra::Model::DB->new(); my @plugins = Hydra::Plugin->instantiate(db => $db, config => $config); my $dbh = $db->storage->dbh; $dbh->do("listen build_started"); $dbh->do("listen build_finished"); $dbh->do("listen step_finished"); sub buildStarted { my ($buildId) = @_; my $build = $db->resultset('Builds')->find($buildId) or die "build $buildId does not exist\n"; foreach my $plugin (@plugins) { eval { $plugin->buildStarted($build); 1; } or do { print STDERR "error with $plugin->buildStarted: $@\n"; } } } sub buildFinished { my ($build, @deps) = @_; my $project = $build->project; my $jobsetName = $build->get_column('jobset'); if (length($project->declfile) && $jobsetName eq ".jobsets" && $build->iscurrent) { handleDeclarativeJobsetBuild($db, $project, $build); } my @dependents; foreach my $id (@deps) { my $dep = $db->resultset('Builds')->find($id) or die "build $id does not exist\n"; push @dependents, $dep; } foreach my $plugin (@plugins) { eval { $plugin->buildFinished($build, [@dependents]); 1; } or do { print STDERR "error with $plugin->buildFinished: $@\n"; } } # We have to iterate through all dependents as well, and if they are finished # to mark their notificationpendingsince. # Otherwise, the dependent builds will remain with notificationpendingsince set # until hydra-notify is started, as buildFinished is never emitted for them. foreach my $b ($build, @dependents) { $b->update({ notificationpendingsince => undef }) if $b->finished; } } sub stepFinished { my ($buildId, $stepNr, $logPath) = @_; my $build = $db->resultset('Builds')->find($buildId) or die "build $buildId does not exist\n"; my $step = $build->buildsteps->find({stepnr => $stepNr}) or die "step $stepNr does not exist\n"; $logPath = undef if $logPath eq "-"; foreach my $plugin (@plugins) { eval { $plugin->stepFinished($step, $logPath); 1; } or do { print STDERR "error with $plugin->stepFinished: $@\n"; } } } # Process builds that finished while hydra-notify wasn't running. for my $build ($db->resultset('Builds')->search( { notificationpendingsince => { '!=', undef } })) { my $buildId = $build->id; print STDERR "sending notifications for build ${\$buildId}...\n"; buildFinished($build); } # Process incoming notifications. my $fd = $dbh->func("getfd"); my $sel = IO::Select->new($fd); while (!$queued_only) { $sel->can_read; while (my $notify = $dbh->func("pg_notifies")) { my ($channelName, $pid, $payload) = @$notify; #print STDERR "got '$channelName' from $pid: $payload\n"; my @payload = split /\t/, $payload; eval { if ($channelName eq "build_started") { buildStarted(int($payload[0])); } elsif ($channelName eq "build_finished") { my $buildId = int($payload[0]); my $build = $db->resultset('Builds')->find($buildId) or die "build $buildId does not exist\n"; buildFinished($build, @payload[1..$#payload]); } elsif ($channelName eq "step_finished") { stepFinished(int($payload[0]), int($payload[1])); } 1; } or do { print STDERR "error processing message '$payload' on channel '$channelName': $@\n"; } } }