forked from lix-project/hydra
hydra-notify: extract runPluginsForEvent to a TaskDispatcher
This commit is contained in:
parent
bec44614f4
commit
1f2adf61aa
15
src/lib/Hydra/Task.pm
Normal file
15
src/lib/Hydra/Task.pm
Normal file
|
@ -0,0 +1,15 @@
|
||||||
|
package Hydra::Task;
|
||||||
|
|
||||||
|
use strict;
|
||||||
|
use warnings;
|
||||||
|
|
||||||
|
sub new {
|
||||||
|
my ($self, $event, $plugin_name) = @_;
|
||||||
|
|
||||||
|
return bless {
|
||||||
|
"event" => $event,
|
||||||
|
"plugin_name" => $plugin_name,
|
||||||
|
}, $self;
|
||||||
|
}
|
||||||
|
|
||||||
|
1;
|
157
src/lib/Hydra/TaskDispatcher.pm
Normal file
157
src/lib/Hydra/TaskDispatcher.pm
Normal file
|
@ -0,0 +1,157 @@
|
||||||
|
package Hydra::TaskDispatcher;
|
||||||
|
|
||||||
|
use strict;
|
||||||
|
use warnings;
|
||||||
|
use Hydra::Task;
|
||||||
|
use Time::HiRes qw( gettimeofday tv_interval );
|
||||||
|
|
||||||
|
=head1 Hydra::TaskDispatcher
|
||||||
|
|
||||||
|
Excecute many plugins with Hydra::Event as its input.
|
||||||
|
|
||||||
|
The TaskDispatcher is responsible for dealing with fanout
|
||||||
|
from one incoming Event being executed across many plugins,
|
||||||
|
or one Event being executed against a single plugin by first
|
||||||
|
wrapping it in a Task.
|
||||||
|
|
||||||
|
Its execution model is based on creating a Hydra::Task for
|
||||||
|
each plugin's execution. The task represents the name of
|
||||||
|
the plugin to run and the Event to process.
|
||||||
|
|
||||||
|
=cut
|
||||||
|
|
||||||
|
=head2 new
|
||||||
|
|
||||||
|
Arguments:
|
||||||
|
|
||||||
|
=over 1
|
||||||
|
|
||||||
|
=item C<$dbh>
|
||||||
|
L<DBI::db> The database connection.
|
||||||
|
|
||||||
|
=back
|
||||||
|
|
||||||
|
=item C<$prometheus>
|
||||||
|
L<Prometheus::Tiny> A Promethues implementation, either Prometheus::Tiny
|
||||||
|
or Prometheus::Tiny::Shared. Not compatible with Net::Prometheus.
|
||||||
|
|
||||||
|
=back
|
||||||
|
|
||||||
|
=item C<%plugins>
|
||||||
|
L<Hydra::Plugin> A list of Hydra plugins to execute events and tasks against.
|
||||||
|
|
||||||
|
=back
|
||||||
|
|
||||||
|
=cut
|
||||||
|
|
||||||
|
sub new {
|
||||||
|
my ($self, $db, $prometheus, $plugins) = @_;
|
||||||
|
|
||||||
|
$prometheus->declare(
|
||||||
|
"notify_plugin_executions",
|
||||||
|
type => "counter",
|
||||||
|
help => "Number of times each plugin has been called by channel."
|
||||||
|
);
|
||||||
|
$prometheus->declare(
|
||||||
|
"notify_plugin_runtime",
|
||||||
|
type => "histogram",
|
||||||
|
help => "Number of seconds spent executing each plugin by channel."
|
||||||
|
);
|
||||||
|
$prometheus->declare(
|
||||||
|
"notify_plugin_success",
|
||||||
|
type => "counter",
|
||||||
|
help => "Number of successful executions of this plugin on this channel."
|
||||||
|
);
|
||||||
|
$prometheus->declare(
|
||||||
|
"notify_plugin_error",
|
||||||
|
type => "counter",
|
||||||
|
help => "Number of failed executions of this plugin on this channel."
|
||||||
|
);
|
||||||
|
|
||||||
|
my %plugins_by_name = map { ref $_ => $_ } @{$plugins};
|
||||||
|
|
||||||
|
my $obj = bless {
|
||||||
|
"db" => $db,
|
||||||
|
"prometheus" => $prometheus,
|
||||||
|
"plugins_by_name" => \%plugins_by_name,
|
||||||
|
}, $self;
|
||||||
|
}
|
||||||
|
|
||||||
|
=head2 dispatch_event
|
||||||
|
|
||||||
|
Execute each configured plugin against the provided L<Hydra::Event>.
|
||||||
|
|
||||||
|
Arguments:
|
||||||
|
|
||||||
|
=over 1
|
||||||
|
|
||||||
|
=item C<$event>
|
||||||
|
|
||||||
|
L<Hydra::Event> the event, usually from L<Hydra::PostgresListener>.
|
||||||
|
|
||||||
|
=back
|
||||||
|
|
||||||
|
=cut
|
||||||
|
|
||||||
|
sub dispatch_event {
|
||||||
|
my ($self, $event) = @_;
|
||||||
|
|
||||||
|
foreach my $plugin_name (keys %{$self->{"plugins_by_name"}}) {
|
||||||
|
my $task = Hydra::Task->new($event, $plugin_name);
|
||||||
|
$self->dispatch_task($task);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
=head2 dispatch_task
|
||||||
|
|
||||||
|
Execute a specifi plugin against the provided L<Hydra::Task>.
|
||||||
|
The Task includes information about what plugin should be executed.
|
||||||
|
If the provided plugin does not exist, an error logged is logged and the
|
||||||
|
function returns falsey.
|
||||||
|
|
||||||
|
Arguments:
|
||||||
|
|
||||||
|
=over 1
|
||||||
|
|
||||||
|
=item C<$task>
|
||||||
|
|
||||||
|
L<Hydra::Task> the task, usually from L<Hydra::Shema::Result::TaskRetries>.
|
||||||
|
|
||||||
|
=back
|
||||||
|
|
||||||
|
=cut
|
||||||
|
sub dispatch_task {
|
||||||
|
my ($self, $task) = @_;
|
||||||
|
|
||||||
|
my $channel_name = $task->{"event"}->{'channel_name'};
|
||||||
|
my $plugin_name = $task->{"plugin_name"};
|
||||||
|
my $event_labels = {
|
||||||
|
channel => $channel_name,
|
||||||
|
plugin => $plugin_name,
|
||||||
|
};
|
||||||
|
|
||||||
|
my $plugin = $self->{"plugins_by_name"}->{$plugin_name};
|
||||||
|
|
||||||
|
if (!defined($plugin)) {
|
||||||
|
$self->{"prometheus"}->inc("notify_plugin_no_such_plugin", $event_labels);
|
||||||
|
print STDERR "No plugin named $plugin_name\n";
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
$self->{"prometheus"}->inc("notify_plugin_executions", $event_labels);
|
||||||
|
eval {
|
||||||
|
my $start_time = [gettimeofday()];
|
||||||
|
|
||||||
|
$task->{"event"}->execute($self->{"db"}, $plugin);
|
||||||
|
|
||||||
|
$self->{"prometheus"}->histogram_observe("notify_plugin_runtime", tv_interval($start_time), $event_labels);
|
||||||
|
$self->{"prometheus"}->inc("notify_plugin_success", $event_labels);
|
||||||
|
1;
|
||||||
|
} or do {
|
||||||
|
$self->{"prometheus"}->inc("notify_plugin_error", $event_labels);
|
||||||
|
print STDERR "error running $channel_name hooks: $@\n";
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
1;
|
|
@ -10,9 +10,9 @@ use Hydra::Helper::AddBuilds;
|
||||||
use Hydra::Helper::Nix;
|
use Hydra::Helper::Nix;
|
||||||
use Hydra::Plugin;
|
use Hydra::Plugin;
|
||||||
use Hydra::PostgresListener;
|
use Hydra::PostgresListener;
|
||||||
|
use Hydra::TaskDispatcher;
|
||||||
use Parallel::ForkManager;
|
use Parallel::ForkManager;
|
||||||
use Prometheus::Tiny::Shared;
|
use Prometheus::Tiny::Shared;
|
||||||
use Time::HiRes qw( gettimeofday tv_interval );
|
|
||||||
|
|
||||||
STDERR->autoflush(1);
|
STDERR->autoflush(1);
|
||||||
STDOUT->autoflush(1);
|
STDOUT->autoflush(1);
|
||||||
|
@ -25,26 +25,6 @@ my $prom = Prometheus::Tiny::Shared->new;
|
||||||
# Add a new declaration for any new metrics you create. Metrics which are
|
# Add a new declaration for any new metrics you create. Metrics which are
|
||||||
# not pre-declared disappear when their value is null. See:
|
# not pre-declared disappear when their value is null. See:
|
||||||
# https://metacpan.org/pod/Prometheus::Tiny#declare
|
# https://metacpan.org/pod/Prometheus::Tiny#declare
|
||||||
$prom->declare(
|
|
||||||
"notify_plugin_executions",
|
|
||||||
type => "counter",
|
|
||||||
help => "Number of times each plugin has been called by channel."
|
|
||||||
);
|
|
||||||
$prom->declare(
|
|
||||||
"notify_plugin_runtime",
|
|
||||||
type => "histogram",
|
|
||||||
help => "Number of seconds spent executing each plugin by channel."
|
|
||||||
);
|
|
||||||
$prom->declare(
|
|
||||||
"notify_plugin_success",
|
|
||||||
type => "counter",
|
|
||||||
help => "Number of successful executions of this plugin on this channel."
|
|
||||||
);
|
|
||||||
$prom->declare(
|
|
||||||
"notify_plugin_error",
|
|
||||||
type => "counter",
|
|
||||||
help => "Number of failed executions of this plugin on this channel."
|
|
||||||
);
|
|
||||||
$prom->declare(
|
$prom->declare(
|
||||||
"event_loop_iterations",
|
"event_loop_iterations",
|
||||||
type => "counter",
|
type => "counter",
|
||||||
|
@ -93,6 +73,7 @@ GetOptions(
|
||||||
my $db = Hydra::Model::DB->new();
|
my $db = Hydra::Model::DB->new();
|
||||||
|
|
||||||
my @plugins = Hydra::Plugin->instantiate(db => $db, config => $config);
|
my @plugins = Hydra::Plugin->instantiate(db => $db, config => $config);
|
||||||
|
my $task_dispatcher = Hydra::TaskDispatcher->new($db, $prom, [@plugins]);
|
||||||
|
|
||||||
my $dbh = $db->storage->dbh;
|
my $dbh = $db->storage->dbh;
|
||||||
|
|
||||||
|
@ -102,39 +83,16 @@ $listener->subscribe("build_finished");
|
||||||
$listener->subscribe("step_finished");
|
$listener->subscribe("step_finished");
|
||||||
$listener->subscribe("hydra_notify_dump_metrics");
|
$listener->subscribe("hydra_notify_dump_metrics");
|
||||||
|
|
||||||
sub runPluginsForEvent {
|
|
||||||
my ($event) = @_;
|
|
||||||
|
|
||||||
my $channelName = $event->{'channel_name'};
|
|
||||||
|
|
||||||
foreach my $plugin (@plugins) {
|
|
||||||
$prom->inc("notify_plugin_executions", { channel => $channelName, plugin => ref $plugin });
|
|
||||||
eval {
|
|
||||||
my $startTime = [gettimeofday()];
|
|
||||||
$event->execute($db, $plugin);
|
|
||||||
|
|
||||||
$prom->histogram_observe("notify_plugin_runtime", tv_interval($startTime), { channel => $channelName, plugin => ref $plugin });
|
|
||||||
$prom->inc("notify_plugin_success", { channel => $channelName, plugin => ref $plugin });
|
|
||||||
1;
|
|
||||||
} or do {
|
|
||||||
$prom->inc("notify_plugin_error", { channel => $channelName, plugin => ref $plugin });
|
|
||||||
print STDERR "error running $event->{'channel_name'} hooks: $@\n";
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
# Process builds that finished while hydra-notify wasn't running.
|
# Process builds that finished while hydra-notify wasn't running.
|
||||||
for my $build ($db->resultset('Builds')->search(
|
for my $build ($db->resultset('Builds')->search(
|
||||||
{ notificationpendingsince => { '!=', undef } }))
|
{ notificationpendingsince => { '!=', undef } }))
|
||||||
{
|
{
|
||||||
print STDERR "sending notifications for build ${\$build->id}...\n";
|
print STDERR "sending notifications for build ${\$build->id}...\n";
|
||||||
|
|
||||||
|
my $event = Hydra::Event->new_event("build_finished", $build->id);
|
||||||
my $event = Hydra::Event::BuildFinished->new($build->id);
|
$task_dispatcher->dispatch_event($event);
|
||||||
runPluginsForEvent($event);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
# Process incoming notifications.
|
# Process incoming notifications.
|
||||||
while (!$queued_only) {
|
while (!$queued_only) {
|
||||||
$prom->inc("event_loop_iterations");
|
$prom->inc("event_loop_iterations");
|
||||||
|
@ -154,7 +112,7 @@ while (!$queued_only) {
|
||||||
|
|
||||||
eval {
|
eval {
|
||||||
my $event = Hydra::Event->new_event($channelName, $message->{"payload"});
|
my $event = Hydra::Event->new_event($channelName, $message->{"payload"});
|
||||||
runPluginsForEvent($event);
|
$task_dispatcher->dispatch_event($event);
|
||||||
|
|
||||||
1;
|
1;
|
||||||
} or do {
|
} or do {
|
||||||
|
|
122
t/TaskDispatcher.t
Normal file
122
t/TaskDispatcher.t
Normal file
|
@ -0,0 +1,122 @@
|
||||||
|
use strict;
|
||||||
|
use warnings;
|
||||||
|
use Setup;
|
||||||
|
|
||||||
|
use Hydra::TaskDispatcher;
|
||||||
|
use Prometheus::Tiny::Shared;
|
||||||
|
|
||||||
|
use Test2::V0;
|
||||||
|
use Test2::Tools::Mock qw(mock_obj);
|
||||||
|
|
||||||
|
my $db = "bogus db";
|
||||||
|
my $prometheus = Prometheus::Tiny::Shared->new;
|
||||||
|
|
||||||
|
sub make_noop_plugin {
|
||||||
|
my ($name) = @_;
|
||||||
|
my $plugin = {
|
||||||
|
"name" => $name,
|
||||||
|
};
|
||||||
|
my $mock_plugin = mock_obj $plugin => ();
|
||||||
|
|
||||||
|
return $mock_plugin;
|
||||||
|
}
|
||||||
|
|
||||||
|
sub make_fake_event {
|
||||||
|
my ($channel_name) = @_;
|
||||||
|
|
||||||
|
my $event = {
|
||||||
|
channel_name => $channel_name,
|
||||||
|
called_with => [],
|
||||||
|
};
|
||||||
|
my $mock_event = mock_obj $event => (
|
||||||
|
add => [
|
||||||
|
"execute" => sub {
|
||||||
|
my ($self, $db, $plugin) = @_;
|
||||||
|
push @{$self->{"called_with"}}, $plugin;
|
||||||
|
}
|
||||||
|
]
|
||||||
|
);
|
||||||
|
|
||||||
|
return $mock_event;
|
||||||
|
}
|
||||||
|
|
||||||
|
sub make_failing_event {
|
||||||
|
my ($channel_name) = @_;
|
||||||
|
|
||||||
|
my $event = {
|
||||||
|
channel_name => $channel_name,
|
||||||
|
called_with => [],
|
||||||
|
};
|
||||||
|
my $mock_event = mock_obj $event => (
|
||||||
|
add => [
|
||||||
|
"execute" => sub {
|
||||||
|
my ($self, $db, $plugin) = @_;
|
||||||
|
push @{$self->{"called_with"}}, $plugin;
|
||||||
|
die "Failing plugin."
|
||||||
|
}
|
||||||
|
]
|
||||||
|
);
|
||||||
|
|
||||||
|
return $mock_event;
|
||||||
|
}
|
||||||
|
|
||||||
|
subtest "dispatch_event" => sub {
|
||||||
|
subtest "every plugin gets called once, even if it fails all of them." => sub {
|
||||||
|
my @plugins = [make_noop_plugin("bogus-1"), make_noop_plugin("bogus-2")];
|
||||||
|
|
||||||
|
my $dispatcher = Hydra::TaskDispatcher->new($db, $prometheus, @plugins);
|
||||||
|
|
||||||
|
my $event = make_failing_event("bogus-channel");
|
||||||
|
$dispatcher->dispatch_event($event);
|
||||||
|
|
||||||
|
is(@{$event->{"called_with"}}, 2, "Both plugins should be called");
|
||||||
|
|
||||||
|
my @expected_names = [ "bogus-1", "bogus-2" ];
|
||||||
|
my @actual_names = sort([
|
||||||
|
$event->{"called_with"}[0]->name,
|
||||||
|
$event->{"called_with"}[1]->name
|
||||||
|
]);
|
||||||
|
|
||||||
|
is(
|
||||||
|
@actual_names,
|
||||||
|
@expected_names,
|
||||||
|
"Both plugins should be executed, but not in any particular order."
|
||||||
|
);
|
||||||
|
};
|
||||||
|
};
|
||||||
|
|
||||||
|
subtest "dispatch_task" => sub {
|
||||||
|
subtest "every plugin gets called once" => sub {
|
||||||
|
my $bogus_plugin = make_noop_plugin("bogus-1");
|
||||||
|
my @plugins = [$bogus_plugin, make_noop_plugin("bogus-2")];
|
||||||
|
|
||||||
|
my $dispatcher = Hydra::TaskDispatcher->new($db, $prometheus, @plugins);
|
||||||
|
|
||||||
|
my $event = make_fake_event("bogus-channel");
|
||||||
|
my $task = Hydra::Task->new($event, ref $bogus_plugin);
|
||||||
|
is($dispatcher->dispatch_task($task), 1, "Calling dispatch_task returns truthy.");
|
||||||
|
|
||||||
|
is(@{$event->{"called_with"}}, 1, "Just one plugin should be called");
|
||||||
|
|
||||||
|
is(
|
||||||
|
$event->{"called_with"}[0]->name,
|
||||||
|
"bogus-1",
|
||||||
|
"Just bogus-1 should be executed."
|
||||||
|
);
|
||||||
|
};
|
||||||
|
subtest "a task with an invalid plugin is not fatal" => sub {
|
||||||
|
my $bogus_plugin = make_noop_plugin("bogus-1");
|
||||||
|
my @plugins = [$bogus_plugin, make_noop_plugin("bogus-2")];
|
||||||
|
|
||||||
|
my $dispatcher = Hydra::TaskDispatcher->new($db, $prometheus, @plugins);
|
||||||
|
|
||||||
|
my $event = make_fake_event("bogus-channel");
|
||||||
|
my $task = Hydra::Task->new($event, "this-plugin-does-not-exist");
|
||||||
|
is($dispatcher->dispatch_task($task), 0, "Calling dispatch_task returns falsey.");
|
||||||
|
|
||||||
|
is(@{$event->{"called_with"}}, 0, "No plugins are called");
|
||||||
|
};
|
||||||
|
};
|
||||||
|
|
||||||
|
|
||||||
|
done_testing;
|
Loading…
Reference in a new issue