diff --git a/src/lib/Hydra/Task.pm b/src/lib/Hydra/Task.pm new file mode 100644 index 00000000..31d57d3c --- /dev/null +++ b/src/lib/Hydra/Task.pm @@ -0,0 +1,15 @@ +package Hydra::Task; + +use strict; +use warnings; + +sub new { + my ($self, $event, $plugin_name) = @_; + + return bless { + "event" => $event, + "plugin_name" => $plugin_name, + }, $self; +} + +1; diff --git a/src/lib/Hydra/TaskDispatcher.pm b/src/lib/Hydra/TaskDispatcher.pm new file mode 100644 index 00000000..425a89a3 --- /dev/null +++ b/src/lib/Hydra/TaskDispatcher.pm @@ -0,0 +1,157 @@ +package Hydra::TaskDispatcher; + +use strict; +use warnings; +use Hydra::Task; +use Time::HiRes qw( gettimeofday tv_interval ); + +=head1 Hydra::TaskDispatcher + +Excecute many plugins with Hydra::Event as its input. + +The TaskDispatcher is responsible for dealing with fanout +from one incoming Event being executed across many plugins, +or one Event being executed against a single plugin by first +wrapping it in a Task. + +Its execution model is based on creating a Hydra::Task for +each plugin's execution. The task represents the name of +the plugin to run and the Event to process. + +=cut + +=head2 new + +Arguments: + +=over 1 + +=item C<$dbh> +L<DBI::db> The database connection. + +=back + +=item C<$prometheus> +L<Prometheus::Tiny> A Promethues implementation, either Prometheus::Tiny +or Prometheus::Tiny::Shared. Not compatible with Net::Prometheus. + +=back + +=item C<%plugins> +L<Hydra::Plugin> A list of Hydra plugins to execute events and tasks against. + +=back + +=cut + +sub new { + my ($self, $db, $prometheus, $plugins) = @_; + + $prometheus->declare( + "notify_plugin_executions", + type => "counter", + help => "Number of times each plugin has been called by channel." + ); + $prometheus->declare( + "notify_plugin_runtime", + type => "histogram", + help => "Number of seconds spent executing each plugin by channel." + ); + $prometheus->declare( + "notify_plugin_success", + type => "counter", + help => "Number of successful executions of this plugin on this channel." + ); + $prometheus->declare( + "notify_plugin_error", + type => "counter", + help => "Number of failed executions of this plugin on this channel." + ); + + my %plugins_by_name = map { ref $_ => $_ } @{$plugins}; + + my $obj = bless { + "db" => $db, + "prometheus" => $prometheus, + "plugins_by_name" => \%plugins_by_name, + }, $self; +} + +=head2 dispatch_event + +Execute each configured plugin against the provided L<Hydra::Event>. + +Arguments: + +=over 1 + +=item C<$event> + +L<Hydra::Event> the event, usually from L<Hydra::PostgresListener>. + +=back + +=cut + +sub dispatch_event { + my ($self, $event) = @_; + + foreach my $plugin_name (keys %{$self->{"plugins_by_name"}}) { + my $task = Hydra::Task->new($event, $plugin_name); + $self->dispatch_task($task); + } +} + +=head2 dispatch_task + +Execute a specifi plugin against the provided L<Hydra::Task>. +The Task includes information about what plugin should be executed. +If the provided plugin does not exist, an error logged is logged and the +function returns falsey. + +Arguments: + +=over 1 + +=item C<$task> + +L<Hydra::Task> the task, usually from L<Hydra::Shema::Result::TaskRetries>. + +=back + +=cut +sub dispatch_task { + my ($self, $task) = @_; + + my $channel_name = $task->{"event"}->{'channel_name'}; + my $plugin_name = $task->{"plugin_name"}; + my $event_labels = { + channel => $channel_name, + plugin => $plugin_name, + }; + + my $plugin = $self->{"plugins_by_name"}->{$plugin_name}; + + if (!defined($plugin)) { + $self->{"prometheus"}->inc("notify_plugin_no_such_plugin", $event_labels); + print STDERR "No plugin named $plugin_name\n"; + return 0; + } + + $self->{"prometheus"}->inc("notify_plugin_executions", $event_labels); + eval { + my $start_time = [gettimeofday()]; + + $task->{"event"}->execute($self->{"db"}, $plugin); + + $self->{"prometheus"}->histogram_observe("notify_plugin_runtime", tv_interval($start_time), $event_labels); + $self->{"prometheus"}->inc("notify_plugin_success", $event_labels); + 1; + } or do { + $self->{"prometheus"}->inc("notify_plugin_error", $event_labels); + print STDERR "error running $channel_name hooks: $@\n"; + return 0; + } +} + +1; diff --git a/src/script/hydra-notify b/src/script/hydra-notify index 770f0620..42b1978e 100755 --- a/src/script/hydra-notify +++ b/src/script/hydra-notify @@ -10,9 +10,9 @@ use Hydra::Helper::AddBuilds; use Hydra::Helper::Nix; use Hydra::Plugin; use Hydra::PostgresListener; +use Hydra::TaskDispatcher; use Parallel::ForkManager; use Prometheus::Tiny::Shared; -use Time::HiRes qw( gettimeofday tv_interval ); STDERR->autoflush(1); STDOUT->autoflush(1); @@ -25,26 +25,6 @@ my $prom = Prometheus::Tiny::Shared->new; # Add a new declaration for any new metrics you create. Metrics which are # not pre-declared disappear when their value is null. See: # https://metacpan.org/pod/Prometheus::Tiny#declare -$prom->declare( - "notify_plugin_executions", - type => "counter", - help => "Number of times each plugin has been called by channel." -); -$prom->declare( - "notify_plugin_runtime", - type => "histogram", - help => "Number of seconds spent executing each plugin by channel." -); -$prom->declare( - "notify_plugin_success", - type => "counter", - help => "Number of successful executions of this plugin on this channel." -); -$prom->declare( - "notify_plugin_error", - type => "counter", - help => "Number of failed executions of this plugin on this channel." -); $prom->declare( "event_loop_iterations", type => "counter", @@ -93,6 +73,7 @@ GetOptions( my $db = Hydra::Model::DB->new(); my @plugins = Hydra::Plugin->instantiate(db => $db, config => $config); +my $task_dispatcher = Hydra::TaskDispatcher->new($db, $prom, [@plugins]); my $dbh = $db->storage->dbh; @@ -102,39 +83,16 @@ $listener->subscribe("build_finished"); $listener->subscribe("step_finished"); $listener->subscribe("hydra_notify_dump_metrics"); -sub runPluginsForEvent { - my ($event) = @_; - - my $channelName = $event->{'channel_name'}; - - foreach my $plugin (@plugins) { - $prom->inc("notify_plugin_executions", { channel => $channelName, plugin => ref $plugin }); - eval { - my $startTime = [gettimeofday()]; - $event->execute($db, $plugin); - - $prom->histogram_observe("notify_plugin_runtime", tv_interval($startTime), { channel => $channelName, plugin => ref $plugin }); - $prom->inc("notify_plugin_success", { channel => $channelName, plugin => ref $plugin }); - 1; - } or do { - $prom->inc("notify_plugin_error", { channel => $channelName, plugin => ref $plugin }); - print STDERR "error running $event->{'channel_name'} hooks: $@\n"; - } - } -} - # Process builds that finished while hydra-notify wasn't running. for my $build ($db->resultset('Builds')->search( { notificationpendingsince => { '!=', undef } })) { print STDERR "sending notifications for build ${\$build->id}...\n"; - - my $event = Hydra::Event::BuildFinished->new($build->id); - runPluginsForEvent($event); + my $event = Hydra::Event->new_event("build_finished", $build->id); + $task_dispatcher->dispatch_event($event); } - # Process incoming notifications. while (!$queued_only) { $prom->inc("event_loop_iterations"); @@ -154,7 +112,7 @@ while (!$queued_only) { eval { my $event = Hydra::Event->new_event($channelName, $message->{"payload"}); - runPluginsForEvent($event); + $task_dispatcher->dispatch_event($event); 1; } or do { diff --git a/t/TaskDispatcher.t b/t/TaskDispatcher.t new file mode 100644 index 00000000..ef3a8579 --- /dev/null +++ b/t/TaskDispatcher.t @@ -0,0 +1,122 @@ +use strict; +use warnings; +use Setup; + +use Hydra::TaskDispatcher; +use Prometheus::Tiny::Shared; + +use Test2::V0; +use Test2::Tools::Mock qw(mock_obj); + +my $db = "bogus db"; +my $prometheus = Prometheus::Tiny::Shared->new; + +sub make_noop_plugin { + my ($name) = @_; + my $plugin = { + "name" => $name, + }; + my $mock_plugin = mock_obj $plugin => (); + + return $mock_plugin; +} + +sub make_fake_event { + my ($channel_name) = @_; + + my $event = { + channel_name => $channel_name, + called_with => [], + }; + my $mock_event = mock_obj $event => ( + add => [ + "execute" => sub { + my ($self, $db, $plugin) = @_; + push @{$self->{"called_with"}}, $plugin; + } + ] + ); + + return $mock_event; +} + +sub make_failing_event { + my ($channel_name) = @_; + + my $event = { + channel_name => $channel_name, + called_with => [], + }; + my $mock_event = mock_obj $event => ( + add => [ + "execute" => sub { + my ($self, $db, $plugin) = @_; + push @{$self->{"called_with"}}, $plugin; + die "Failing plugin." + } + ] + ); + + return $mock_event; +} + +subtest "dispatch_event" => sub { + subtest "every plugin gets called once, even if it fails all of them." => sub { + my @plugins = [make_noop_plugin("bogus-1"), make_noop_plugin("bogus-2")]; + + my $dispatcher = Hydra::TaskDispatcher->new($db, $prometheus, @plugins); + + my $event = make_failing_event("bogus-channel"); + $dispatcher->dispatch_event($event); + + is(@{$event->{"called_with"}}, 2, "Both plugins should be called"); + + my @expected_names = [ "bogus-1", "bogus-2" ]; + my @actual_names = sort([ + $event->{"called_with"}[0]->name, + $event->{"called_with"}[1]->name + ]); + + is( + @actual_names, + @expected_names, + "Both plugins should be executed, but not in any particular order." + ); + }; +}; + +subtest "dispatch_task" => sub { + subtest "every plugin gets called once" => sub { + my $bogus_plugin = make_noop_plugin("bogus-1"); + my @plugins = [$bogus_plugin, make_noop_plugin("bogus-2")]; + + my $dispatcher = Hydra::TaskDispatcher->new($db, $prometheus, @plugins); + + my $event = make_fake_event("bogus-channel"); + my $task = Hydra::Task->new($event, ref $bogus_plugin); + is($dispatcher->dispatch_task($task), 1, "Calling dispatch_task returns truthy."); + + is(@{$event->{"called_with"}}, 1, "Just one plugin should be called"); + + is( + $event->{"called_with"}[0]->name, + "bogus-1", + "Just bogus-1 should be executed." + ); + }; + subtest "a task with an invalid plugin is not fatal" => sub { + my $bogus_plugin = make_noop_plugin("bogus-1"); + my @plugins = [$bogus_plugin, make_noop_plugin("bogus-2")]; + + my $dispatcher = Hydra::TaskDispatcher->new($db, $prometheus, @plugins); + + my $event = make_fake_event("bogus-channel"); + my $task = Hydra::Task->new($event, "this-plugin-does-not-exist"); + is($dispatcher->dispatch_task($task), 0, "Calling dispatch_task returns falsey."); + + is(@{$event->{"called_with"}}, 0, "No plugins are called"); + }; +}; + + +done_testing;