Merge pull request #1083 from DeterminateSystems/faster-notifications

Faster notifications and buildQueued support
This commit is contained in:
Graham Christensen 2021-12-23 09:23:00 -05:00 committed by GitHub
commit 76962bfcb0
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
15 changed files with 281 additions and 21 deletions

View file

@ -8,6 +8,12 @@ Notifications are passed from `hydra-queue-runner` to `hydra-notify` through Pos
Note that the notification format is subject to change and should not be considered an API. Integrate with `hydra-notify` instead of listening directly.
### `build_queued`
* **Payload:** Exactly one value, the ID of the build.
* **When:** Issued after the transaction inserting the build in to the database is committed. One notification is sent per new build.
* **Delivery Semantics:** Ephemeral. `hydra-notify` must be running to react to this event. No record of this event is stored.
### `build_started`
* **Payload:** Exactly one value, the ID of the build.

View file

@ -3,10 +3,12 @@ package Hydra::Event;
use strict;
use warnings;
use Hydra::Event::BuildFinished;
use Hydra::Event::BuildQueued;
use Hydra::Event::BuildStarted;
use Hydra::Event::StepFinished;
my %channels_to_events = (
build_queued => \&Hydra::Event::BuildQueued::parse,
build_started => \&Hydra::Event::BuildStarted::parse,
step_finished => \&Hydra::Event::StepFinished::parse,
build_finished => \&Hydra::Event::BuildFinished::parse,
@ -36,6 +38,12 @@ sub new_event {
}, $self;
}
sub interestedIn {
my ($self, $plugin) = @_;
return $self->{"event"}->interestedIn($plugin);
}
sub execute {
my ($self, $db, $plugin) = @_;
return $self->{"event"}->execute($db, $plugin);

View file

@ -27,6 +27,11 @@ sub new {
}, $self;
}
sub interestedIn {
my ($self, $plugin) = @_;
return int(defined($plugin->can('buildFinished')));
}
sub load {
my ($self, $db) = @_;

View file

@ -0,0 +1,52 @@
package Hydra::Event::BuildQueued;
use strict;
use warnings;
sub parse :prototype(@) {
unless (@_ == 1) {
die "build_queued: payload takes only one argument, but ", scalar(@_), " were given";
}
my ($build_id) = @_;
unless ($build_id =~ /^\d+$/) {
die "build_queued: payload argument should be an integer, but '", $build_id, "' was given"
}
return Hydra::Event::BuildQueued->new(int($build_id));
}
sub new {
my ($self, $id) = @_;
return bless {
"build_id" => $id,
"build" => undef
}, $self;
}
sub interestedIn {
my ($self, $plugin) = @_;
return int(defined($plugin->can('buildQueued')));
}
sub load {
my ($self, $db) = @_;
if (!defined($self->{"build"})) {
$self->{"build"} = $db->resultset('Builds')->find($self->{"build_id"})
or die "build $self->{'build_id'} does not exist\n";
}
}
sub execute {
my ($self, $db, $plugin) = @_;
$self->load($db);
$plugin->buildQueued($self->{"build"});
return 1;
}
1;

View file

@ -25,6 +25,11 @@ sub new {
}, $self;
}
sub interestedIn {
my ($self, $plugin) = @_;
return int(defined($plugin->can('buildStarted')));
}
sub load {
my ($self, $db) = @_;

View file

@ -34,6 +34,11 @@ sub new :prototype($$$) {
}, $self;
}
sub interestedIn {
my ($self, $plugin) = @_;
return int(defined($plugin->can('stepFinished')));
}
sub load {
my ($self, $db) = @_;

View file

@ -25,29 +25,35 @@ sub instantiate {
return @$plugins;
}
# Called when build $build has been queued.
sub buildQueued {
my ($self, $build) = @_;
}
# To implement behaviors in response to the following events, implement
# the function in your plugin and it will be executed by hydra-notify.
#
# See the tests in t/Event/*.t for arguments, and the documentation for
# notify events for semantics.
#
# # Called when build $build has been queued.
# sub buildQueued {
# my ($self, $build) = @_;
# }
# Called when build $build has started.
sub buildStarted {
my ($self, $build) = @_;
}
# # Called when build $build has started.
# sub buildStarted {
# my ($self, $build) = @_;
# }
# Called when build $build has finished. If the build failed, then
# $dependents is an array ref to a list of builds that have also
# failed as a result (i.e. because they depend on $build or a failed
# dependeny of $build).
sub buildFinished {
my ($self, $build, $dependents) = @_;
}
# # Called when build $build has finished. If the build failed, then
# # $dependents is an array ref to a list of builds that have also
# # failed as a result (i.e. because they depend on $build or a failed
# # dependeny of $build).
# sub buildFinished {
# my ($self, $build, $dependents) = @_;
# }
# Called when step $step has finished. The build log is stored in the
# file $logPath (bzip2-compressed).
sub stepFinished {
my ($self, $step, $logPath) = @_;
}
# # Called when step $step has finished. The build log is stored in the
# # file $logPath (bzip2-compressed).
# sub stepFinished {
# my ($self, $step, $logPath) = @_;
# }
# Called to determine the set of supported input types. The plugin
# should add these to the $inputTypes hashref, e.g. $inputTypes{'svn'}

View file

@ -14,7 +14,6 @@ use Data::Dumper;
my $CONFIG_SECTION = "git-input";
sub supportedInputTypes {
my ($self, $inputTypes) = @_;
$inputTypes->{'git'} = 'Git checkout';

View file

@ -112,6 +112,16 @@ sub new {
type => "counter",
help => "Number of tasks that have been requeued after a failure."
);
$prometheus->declare(
"notify_plugin_no_such_plugin",
type => "counter",
help => "Number of tasks that have not been processed because the plugin does not exist."
);
$prometheus->declare(
"notify_plugin_not_interested",
type => "counter",
help => "Number of tasks that have not been processed because the plugin was not interested in the event."
);
my %plugins_by_name = map { ref $_ => $_ } @{$plugins};
@ -185,6 +195,11 @@ sub dispatch_task {
return 0;
}
if (!$task->{"event"}->interestedIn($plugin)) {
$self->{"prometheus"}->inc("notify_plugin_not_interested", $event_labels);
return 0;
}
$self->{"prometheus"}->inc("notify_plugin_executions", $event_labels);
eval {
my $start_time = [gettimeofday()];

View file

@ -486,6 +486,7 @@ sub checkBuild {
$buildMap->{$build->id} = { id => $build->id, jobName => $jobName, new => 1, drvPath => $drvPath };
$$jobOutPathMap{$jobName . "\t" . $firstOutputPath} = $build->id;
$db->storage->dbh->do("notify build_queued, ?", undef, $build->id);
print STDERR "added build ${\$build->id} (${\$jobset->get_column('project')}:${\$jobset->name}:$jobName)\n";
});

View file

@ -93,6 +93,7 @@ my $task_dispatcher = Hydra::TaskDispatcher->new(
my $dbh = $db->storage->dbh;
my $listener = Hydra::PostgresListener->new($dbh);
$listener->subscribe("build_queued");
$listener->subscribe("build_started");
$listener->subscribe("build_finished");
$listener->subscribe("step_finished");

View file

@ -54,6 +54,28 @@ my $jobset = createBaseJobset("basic", "basic.nix", $ctx{jobsdir});
ok(evalSucceeds($jobset), "Evaluating jobs/basic.nix should exit with return code 0");
is(nrQueuedBuildsForJobset($jobset), 3, "Evaluating jobs/basic.nix should result in 3 builds");
subtest "interested" => sub {
my $event = Hydra::Event::BuildFinished->new(123, []);
subtest "A plugin which does not implement the API" => sub {
my $plugin = {};
my $mock = mock_obj $plugin => ();
is($event->interestedIn($plugin), 0, "The plugin is not interesting.");
};
subtest "A plugin which does implement the API" => sub {
my $plugin = {};
my $mock = mock_obj $plugin => (
add => [
"buildFinished" => sub {}
]
);
is($event->interestedIn($plugin), 1, "The plugin is interesting.");
};
};
subtest "load" => sub {
my ($build, $dependent_a, $dependent_b) = $db->resultset('Builds')->search(
{ },

91
t/Event/BuildQueued.t Normal file
View file

@ -0,0 +1,91 @@
use strict;
use warnings;
use Setup;
use Hydra::Event;
use Hydra::Event::BuildQueued;
use Test2::V0;
use Test2::Tools::Exception;
use Test2::Tools::Mock qw(mock_obj);
my $ctx = test_context();
my $db = $ctx->db();
my $builds = $ctx->makeAndEvaluateJobset(
expression => "basic.nix"
);
subtest "Parsing build_queued" => sub {
like(
dies { Hydra::Event::parse_payload("build_queued", "") },
qr/one argument/,
"empty payload"
);
like(
dies { Hydra::Event::parse_payload("build_queued", "abc123\tabc123") },
qr/only one argument/,
"two arguments"
);
like(
dies { Hydra::Event::parse_payload("build_queued", "abc123") },
qr/should be an integer/,
"not an integer"
);
is(
Hydra::Event::parse_payload("build_queued", "19"),
Hydra::Event::BuildQueued->new(19),
"Valid parse"
);
};
subtest "interested" => sub {
my $event = Hydra::Event::BuildQueued->new(123, []);
subtest "A plugin which does not implement the API" => sub {
my $plugin = {};
my $mock = mock_obj $plugin => ();
is($event->interestedIn($plugin), 0, "The plugin is not interesting.");
};
subtest "A plugin which does implement the API" => sub {
my $plugin = {};
my $mock = mock_obj $plugin => (
add => [
"buildQueued" => sub {}
]
);
is($event->interestedIn($plugin), 1, "The plugin is interesting.");
};
};
subtest "load" => sub {
my $build = $builds->{"empty_dir"};
my $event = Hydra::Event::BuildQueued->new($build->id);
$event->load($db);
is($event->{"build"}->id, $build->id, "The build record matches.");
# Create a fake "plugin" with a buildQueued sub, the sub sets this
# global passedBuild variable.
my $passedBuild;
my $plugin = {};
my $mock = mock_obj $plugin => (
add => [
"buildQueued" => sub {
my ($self, $build) = @_;
$passedBuild = $build;
}
]
);
$event->execute($db, $plugin);
is($passedBuild->id, $build->id, "The plugin's buildQueued hook is called with the proper build");
};
done_testing;

View file

@ -45,6 +45,28 @@ subtest "Parsing build_started" => sub {
);
};
subtest "interested" => sub {
my $event = Hydra::Event::BuildStarted->new(123, []);
subtest "A plugin which does not implement the API" => sub {
my $plugin = {};
my $mock = mock_obj $plugin => ();
is($event->interestedIn($plugin), 0, "The plugin is not interesting.");
};
subtest "A plugin which does implement the API" => sub {
my $plugin = {};
my $mock = mock_obj $plugin => (
add => [
"buildStarted" => sub {}
]
);
is($event->interestedIn($plugin), 1, "The plugin is interesting.");
};
};
subtest "load" => sub {
my $build = $db->resultset('Builds')->search(
{ },

View file

@ -64,6 +64,28 @@ subtest "Parsing step_finished" => sub {
);
};
subtest "interested" => sub {
my $event = Hydra::Event::StepFinished->new(123, []);
subtest "A plugin which does not implement the API" => sub {
my $plugin = {};
my $mock = mock_obj $plugin => ();
is($event->interestedIn($plugin), 0, "The plugin is not interesting.");
};
subtest "A plugin which does implement the API" => sub {
my $plugin = {};
my $mock = mock_obj $plugin => (
add => [
"stepFinished" => sub {}
]
);
is($event->interestedIn($plugin), 1, "The plugin is interesting.");
};
};
subtest "load" => sub {
my $step = $db->resultset('BuildSteps')->search(