From 1f2adf61aafa90e18ce13b854f8bc73839f161fa Mon Sep 17 00:00:00 2001 From: Graham Christensen Date: Thu, 26 Aug 2021 15:39:29 -0400 Subject: [PATCH 01/10] hydra-notify: extract runPluginsForEvent to a TaskDispatcher --- src/lib/Hydra/Task.pm | 15 +++ src/lib/Hydra/TaskDispatcher.pm | 157 ++++++++++++++++++++++++++++++++ src/script/hydra-notify | 52 +---------- t/TaskDispatcher.t | 122 +++++++++++++++++++++++++ 4 files changed, 299 insertions(+), 47 deletions(-) create mode 100644 src/lib/Hydra/Task.pm create mode 100644 src/lib/Hydra/TaskDispatcher.pm create mode 100644 t/TaskDispatcher.t diff --git a/src/lib/Hydra/Task.pm b/src/lib/Hydra/Task.pm new file mode 100644 index 00000000..31d57d3c --- /dev/null +++ b/src/lib/Hydra/Task.pm @@ -0,0 +1,15 @@ +package Hydra::Task; + +use strict; +use warnings; + +sub new { + my ($self, $event, $plugin_name) = @_; + + return bless { + "event" => $event, + "plugin_name" => $plugin_name, + }, $self; +} + +1; diff --git a/src/lib/Hydra/TaskDispatcher.pm b/src/lib/Hydra/TaskDispatcher.pm new file mode 100644 index 00000000..425a89a3 --- /dev/null +++ b/src/lib/Hydra/TaskDispatcher.pm @@ -0,0 +1,157 @@ +package Hydra::TaskDispatcher; + +use strict; +use warnings; +use Hydra::Task; +use Time::HiRes qw( gettimeofday tv_interval ); + +=head1 Hydra::TaskDispatcher + +Excecute many plugins with Hydra::Event as its input. + +The TaskDispatcher is responsible for dealing with fanout +from one incoming Event being executed across many plugins, +or one Event being executed against a single plugin by first +wrapping it in a Task. + +Its execution model is based on creating a Hydra::Task for +each plugin's execution. The task represents the name of +the plugin to run and the Event to process. + +=cut + +=head2 new + +Arguments: + +=over 1 + +=item C<$dbh> +L The database connection. + +=back + +=item C<$prometheus> +L A Promethues implementation, either Prometheus::Tiny +or Prometheus::Tiny::Shared. Not compatible with Net::Prometheus. + +=back + +=item C<%plugins> +L A list of Hydra plugins to execute events and tasks against. + +=back + +=cut + +sub new { + my ($self, $db, $prometheus, $plugins) = @_; + + $prometheus->declare( + "notify_plugin_executions", + type => "counter", + help => "Number of times each plugin has been called by channel." + ); + $prometheus->declare( + "notify_plugin_runtime", + type => "histogram", + help => "Number of seconds spent executing each plugin by channel." + ); + $prometheus->declare( + "notify_plugin_success", + type => "counter", + help => "Number of successful executions of this plugin on this channel." + ); + $prometheus->declare( + "notify_plugin_error", + type => "counter", + help => "Number of failed executions of this plugin on this channel." + ); + + my %plugins_by_name = map { ref $_ => $_ } @{$plugins}; + + my $obj = bless { + "db" => $db, + "prometheus" => $prometheus, + "plugins_by_name" => \%plugins_by_name, + }, $self; +} + +=head2 dispatch_event + +Execute each configured plugin against the provided L. + +Arguments: + +=over 1 + +=item C<$event> + +L the event, usually from L. + +=back + +=cut + +sub dispatch_event { + my ($self, $event) = @_; + + foreach my $plugin_name (keys %{$self->{"plugins_by_name"}}) { + my $task = Hydra::Task->new($event, $plugin_name); + $self->dispatch_task($task); + } +} + +=head2 dispatch_task + +Execute a specifi plugin against the provided L. +The Task includes information about what plugin should be executed. +If the provided plugin does not exist, an error logged is logged and the +function returns falsey. + +Arguments: + +=over 1 + +=item C<$task> + +L the task, usually from L. + +=back + +=cut +sub dispatch_task { + my ($self, $task) = @_; + + my $channel_name = $task->{"event"}->{'channel_name'}; + my $plugin_name = $task->{"plugin_name"}; + my $event_labels = { + channel => $channel_name, + plugin => $plugin_name, + }; + + my $plugin = $self->{"plugins_by_name"}->{$plugin_name}; + + if (!defined($plugin)) { + $self->{"prometheus"}->inc("notify_plugin_no_such_plugin", $event_labels); + print STDERR "No plugin named $plugin_name\n"; + return 0; + } + + $self->{"prometheus"}->inc("notify_plugin_executions", $event_labels); + eval { + my $start_time = [gettimeofday()]; + + $task->{"event"}->execute($self->{"db"}, $plugin); + + $self->{"prometheus"}->histogram_observe("notify_plugin_runtime", tv_interval($start_time), $event_labels); + $self->{"prometheus"}->inc("notify_plugin_success", $event_labels); + 1; + } or do { + $self->{"prometheus"}->inc("notify_plugin_error", $event_labels); + print STDERR "error running $channel_name hooks: $@\n"; + return 0; + } +} + +1; diff --git a/src/script/hydra-notify b/src/script/hydra-notify index 770f0620..42b1978e 100755 --- a/src/script/hydra-notify +++ b/src/script/hydra-notify @@ -10,9 +10,9 @@ use Hydra::Helper::AddBuilds; use Hydra::Helper::Nix; use Hydra::Plugin; use Hydra::PostgresListener; +use Hydra::TaskDispatcher; use Parallel::ForkManager; use Prometheus::Tiny::Shared; -use Time::HiRes qw( gettimeofday tv_interval ); STDERR->autoflush(1); STDOUT->autoflush(1); @@ -25,26 +25,6 @@ my $prom = Prometheus::Tiny::Shared->new; # Add a new declaration for any new metrics you create. Metrics which are # not pre-declared disappear when their value is null. See: # https://metacpan.org/pod/Prometheus::Tiny#declare -$prom->declare( - "notify_plugin_executions", - type => "counter", - help => "Number of times each plugin has been called by channel." -); -$prom->declare( - "notify_plugin_runtime", - type => "histogram", - help => "Number of seconds spent executing each plugin by channel." -); -$prom->declare( - "notify_plugin_success", - type => "counter", - help => "Number of successful executions of this plugin on this channel." -); -$prom->declare( - "notify_plugin_error", - type => "counter", - help => "Number of failed executions of this plugin on this channel." -); $prom->declare( "event_loop_iterations", type => "counter", @@ -93,6 +73,7 @@ GetOptions( my $db = Hydra::Model::DB->new(); my @plugins = Hydra::Plugin->instantiate(db => $db, config => $config); +my $task_dispatcher = Hydra::TaskDispatcher->new($db, $prom, [@plugins]); my $dbh = $db->storage->dbh; @@ -102,39 +83,16 @@ $listener->subscribe("build_finished"); $listener->subscribe("step_finished"); $listener->subscribe("hydra_notify_dump_metrics"); -sub runPluginsForEvent { - my ($event) = @_; - - my $channelName = $event->{'channel_name'}; - - foreach my $plugin (@plugins) { - $prom->inc("notify_plugin_executions", { channel => $channelName, plugin => ref $plugin }); - eval { - my $startTime = [gettimeofday()]; - $event->execute($db, $plugin); - - $prom->histogram_observe("notify_plugin_runtime", tv_interval($startTime), { channel => $channelName, plugin => ref $plugin }); - $prom->inc("notify_plugin_success", { channel => $channelName, plugin => ref $plugin }); - 1; - } or do { - $prom->inc("notify_plugin_error", { channel => $channelName, plugin => ref $plugin }); - print STDERR "error running $event->{'channel_name'} hooks: $@\n"; - } - } -} - # Process builds that finished while hydra-notify wasn't running. for my $build ($db->resultset('Builds')->search( { notificationpendingsince => { '!=', undef } })) { print STDERR "sending notifications for build ${\$build->id}...\n"; - - my $event = Hydra::Event::BuildFinished->new($build->id); - runPluginsForEvent($event); + my $event = Hydra::Event->new_event("build_finished", $build->id); + $task_dispatcher->dispatch_event($event); } - # Process incoming notifications. while (!$queued_only) { $prom->inc("event_loop_iterations"); @@ -154,7 +112,7 @@ while (!$queued_only) { eval { my $event = Hydra::Event->new_event($channelName, $message->{"payload"}); - runPluginsForEvent($event); + $task_dispatcher->dispatch_event($event); 1; } or do { diff --git a/t/TaskDispatcher.t b/t/TaskDispatcher.t new file mode 100644 index 00000000..ef3a8579 --- /dev/null +++ b/t/TaskDispatcher.t @@ -0,0 +1,122 @@ +use strict; +use warnings; +use Setup; + +use Hydra::TaskDispatcher; +use Prometheus::Tiny::Shared; + +use Test2::V0; +use Test2::Tools::Mock qw(mock_obj); + +my $db = "bogus db"; +my $prometheus = Prometheus::Tiny::Shared->new; + +sub make_noop_plugin { + my ($name) = @_; + my $plugin = { + "name" => $name, + }; + my $mock_plugin = mock_obj $plugin => (); + + return $mock_plugin; +} + +sub make_fake_event { + my ($channel_name) = @_; + + my $event = { + channel_name => $channel_name, + called_with => [], + }; + my $mock_event = mock_obj $event => ( + add => [ + "execute" => sub { + my ($self, $db, $plugin) = @_; + push @{$self->{"called_with"}}, $plugin; + } + ] + ); + + return $mock_event; +} + +sub make_failing_event { + my ($channel_name) = @_; + + my $event = { + channel_name => $channel_name, + called_with => [], + }; + my $mock_event = mock_obj $event => ( + add => [ + "execute" => sub { + my ($self, $db, $plugin) = @_; + push @{$self->{"called_with"}}, $plugin; + die "Failing plugin." + } + ] + ); + + return $mock_event; +} + +subtest "dispatch_event" => sub { + subtest "every plugin gets called once, even if it fails all of them." => sub { + my @plugins = [make_noop_plugin("bogus-1"), make_noop_plugin("bogus-2")]; + + my $dispatcher = Hydra::TaskDispatcher->new($db, $prometheus, @plugins); + + my $event = make_failing_event("bogus-channel"); + $dispatcher->dispatch_event($event); + + is(@{$event->{"called_with"}}, 2, "Both plugins should be called"); + + my @expected_names = [ "bogus-1", "bogus-2" ]; + my @actual_names = sort([ + $event->{"called_with"}[0]->name, + $event->{"called_with"}[1]->name + ]); + + is( + @actual_names, + @expected_names, + "Both plugins should be executed, but not in any particular order." + ); + }; +}; + +subtest "dispatch_task" => sub { + subtest "every plugin gets called once" => sub { + my $bogus_plugin = make_noop_plugin("bogus-1"); + my @plugins = [$bogus_plugin, make_noop_plugin("bogus-2")]; + + my $dispatcher = Hydra::TaskDispatcher->new($db, $prometheus, @plugins); + + my $event = make_fake_event("bogus-channel"); + my $task = Hydra::Task->new($event, ref $bogus_plugin); + is($dispatcher->dispatch_task($task), 1, "Calling dispatch_task returns truthy."); + + is(@{$event->{"called_with"}}, 1, "Just one plugin should be called"); + + is( + $event->{"called_with"}[0]->name, + "bogus-1", + "Just bogus-1 should be executed." + ); + }; + subtest "a task with an invalid plugin is not fatal" => sub { + my $bogus_plugin = make_noop_plugin("bogus-1"); + my @plugins = [$bogus_plugin, make_noop_plugin("bogus-2")]; + + my $dispatcher = Hydra::TaskDispatcher->new($db, $prometheus, @plugins); + + my $event = make_fake_event("bogus-channel"); + my $task = Hydra::Task->new($event, "this-plugin-does-not-exist"); + is($dispatcher->dispatch_task($task), 0, "Calling dispatch_task returns falsey."); + + is(@{$event->{"called_with"}}, 0, "No plugins are called"); + }; +}; + + +done_testing; From c4134c8e840444387d11290e4ad3d4062bf9fdba Mon Sep 17 00:00:00 2001 From: Graham Christensen Date: Thu, 26 Aug 2021 16:32:41 -0400 Subject: [PATCH 02/10] TaskRetries: init table --- src/lib/Hydra/Schema/Result/TaskRetries.pm | 110 +++++++++++++++++++++ src/sql/hydra.sql | 15 +++ src/sql/update-dbix.pl | 1 + src/sql/upgrade-77.sql | 15 +++ 4 files changed, 141 insertions(+) create mode 100644 src/lib/Hydra/Schema/Result/TaskRetries.pm create mode 100644 src/sql/upgrade-77.sql diff --git a/src/lib/Hydra/Schema/Result/TaskRetries.pm b/src/lib/Hydra/Schema/Result/TaskRetries.pm new file mode 100644 index 00000000..08c7e8f6 --- /dev/null +++ b/src/lib/Hydra/Schema/Result/TaskRetries.pm @@ -0,0 +1,110 @@ +use utf8; +package Hydra::Schema::Result::TaskRetries; + +# Created by DBIx::Class::Schema::Loader +# DO NOT MODIFY THE FIRST PART OF THIS FILE + +=head1 NAME + +Hydra::Schema::Result::TaskRetries + +=cut + +use strict; +use warnings; + +use base 'DBIx::Class::Core'; + +=head1 COMPONENTS LOADED + +=over 4 + +=item * L + +=back + +=cut + +__PACKAGE__->load_components("+Hydra::Component::ToJSON"); + +=head1 TABLE: C + +=cut + +__PACKAGE__->table("taskretries"); + +=head1 ACCESSORS + +=head2 id + + data_type: 'integer' + is_auto_increment: 1 + is_nullable: 0 + sequence: 'taskretries_id_seq' + +=head2 channel + + data_type: 'text' + is_nullable: 0 + +=head2 pluginname + + data_type: 'text' + is_nullable: 0 + +=head2 payload + + data_type: 'text' + is_nullable: 0 + +=head2 attempts + + data_type: 'integer' + is_nullable: 0 + +=head2 retry_at + + data_type: 'integer' + is_nullable: 0 + +=cut + +__PACKAGE__->add_columns( + "id", + { + data_type => "integer", + is_auto_increment => 1, + is_nullable => 0, + sequence => "taskretries_id_seq", + }, + "channel", + { data_type => "text", is_nullable => 0 }, + "pluginname", + { data_type => "text", is_nullable => 0 }, + "payload", + { data_type => "text", is_nullable => 0 }, + "attempts", + { data_type => "integer", is_nullable => 0 }, + "retry_at", + { data_type => "integer", is_nullable => 0 }, +); + +=head1 PRIMARY KEY + +=over 4 + +=item * L + +=back + +=cut + +__PACKAGE__->set_primary_key("id"); + + +# Created by DBIx::Class::Schema::Loader v0.07049 @ 2021-08-26 16:30:59 +# DO NOT MODIFY THIS OR ANYTHING ABOVE! md5sum:4MC8UnsgrvJVRrIURvSH5A + + +# You can replace this text with custom code or comments, and it will be preserved on regeneration +1; diff --git a/src/sql/hydra.sql b/src/sql/hydra.sql index 9f5857c2..0acbc537 100644 --- a/src/sql/hydra.sql +++ b/src/sql/hydra.sql @@ -553,6 +553,21 @@ create table StarredJobs ( foreign key (project, jobset) references Jobsets(project, name) on update cascade on delete cascade ); +-- Events processed by hydra-notify which have failed at least once +-- +-- The payload field contains the original, unparsed payload. +-- +-- One row is created for each plugin which fails to process the event, +-- with an increasing retry_at and attempts field. +create table TaskRetries ( + id serial primary key not null, + channel text not null, + pluginname text not null, + payload text not null, + attempts integer not null, + retry_at integer not null +); +create index IndexTaskRetriesOrdered on TaskRetries(retry_at asc); -- The output paths that have permanently failed. create table FailedPaths ( diff --git a/src/sql/update-dbix.pl b/src/sql/update-dbix.pl index a9a18f0e..97061f21 100644 --- a/src/sql/update-dbix.pl +++ b/src/sql/update-dbix.pl @@ -39,6 +39,7 @@ make_schema_at("Hydra::Schema", { "starredjobs" => "StarredJobs", "systemstatus" => "SystemStatus", "systemtypes" => "SystemTypes", + "taskretries" => "TaskRetries", "urirevmapper" => "UriRevMapper", "userroles" => "UserRoles", "users" => "Users", diff --git a/src/sql/upgrade-77.sql b/src/sql/upgrade-77.sql new file mode 100644 index 00000000..bc67cc9d --- /dev/null +++ b/src/sql/upgrade-77.sql @@ -0,0 +1,15 @@ +-- Events processed by hydra-notify which have failed at least once +-- +-- The payload field contains the original, unparsed payload. +-- +-- One row is created for each plugin which fails to process the event, +-- with an increasing retry_at and attempts field. +create table TaskRetries ( + id serial primary key not null, + channel text not null, + pluginname text not null, + payload text not null, + attempts integer not null, + retry_at integer not null +); +create index IndexTaskRetriesOrdered on TaskRetries(retry_at asc); From 29738364fb731e89a1d9a4b91a5bd69333f7e2a0 Mon Sep 17 00:00:00 2001 From: Graham Christensen Date: Thu, 26 Aug 2021 12:38:33 -0400 Subject: [PATCH 03/10] ResultSet::TaskRetries: add get_seconds_to_next_retry Get the number of seconds before the next retriable task is ready. This number is specifically intended to be used as a timeout, where `undef` means never time out. --- src/lib/Hydra/Schema/ResultSet/TaskRetries.pm | 41 +++++++++++++++++ t/Schema/ResultSet/TaskRetries.t | 45 +++++++++++++++++++ 2 files changed, 86 insertions(+) create mode 100644 src/lib/Hydra/Schema/ResultSet/TaskRetries.pm create mode 100644 t/Schema/ResultSet/TaskRetries.t diff --git a/src/lib/Hydra/Schema/ResultSet/TaskRetries.pm b/src/lib/Hydra/Schema/ResultSet/TaskRetries.pm new file mode 100644 index 00000000..86d16b9f --- /dev/null +++ b/src/lib/Hydra/Schema/ResultSet/TaskRetries.pm @@ -0,0 +1,41 @@ +package Hydra::Schema::ResultSet::TaskRetries; + +use strict; +use warnings; +use utf8; +use base 'DBIx::Class::ResultSet'; +use List::Util qw(max); + +=head2 get_seconds_to_next_retry + +Query the database to identify how soon the next retryable task is due +for being attempted again. + +If there are no tasks to be reattempted it returns undef. + +If a task's scheduled retry has passed, it returns 0. + +Otherwise, returns the number of seconds from now to look for work. + +=cut +sub get_seconds_to_next_retry { + my ($self) = @_; + + my $next_retry = $self->search( + {}, # any task + { + order_by => { + -asc => 'retry_at' + }, + rows => 1, + } + )->get_column('retry_at')->first; + + if (defined($next_retry)) { + return max(0, $next_retry - time()); + } else { + return undef; + } +} + +1; diff --git a/t/Schema/ResultSet/TaskRetries.t b/t/Schema/ResultSet/TaskRetries.t new file mode 100644 index 00000000..237ef20a --- /dev/null +++ b/t/Schema/ResultSet/TaskRetries.t @@ -0,0 +1,45 @@ +use strict; +use warnings; +use Setup; + +my %ctx = test_init(); + +require Hydra::Schema; +require Hydra::Model::DB; + +use Test2::V0; + +my $db = Hydra::Model::DB->new; +hydra_setup($db); + +my $taskretries = $db->resultset('TaskRetries'); + +subtest "get_seconds_to_next_retry" => sub { + subtest "Without any records in the database" => sub { + is($taskretries->get_seconds_to_next_retry(), undef, "Without any records our next retry moment is forever away."); + }; + + subtest "With only tasks whose retry timestamps are in the future" => sub { + $taskretries->create({ + channel => "bogus", + pluginname => "bogus", + payload => "bogus", + attempts => 1, + retry_at => time() + 100, + }); + is($taskretries->get_seconds_to_next_retry(), within(100, 2), "We should retry in roughly 100 seconds"); + }; + + subtest "With tasks whose retry timestamp are in the past" => sub { + $taskretries->create({ + channel => "bogus", + pluginname => "bogus", + payload => "bogus", + attempts => 1, + retry_at => time() - 100, + }); + is($taskretries->get_seconds_to_next_retry(), 0, "We should retry immediately"); + } +}; + +done_testing; From 42c2d2f3872323f2435891ea88612740627b73c2 Mon Sep 17 00:00:00 2001 From: Graham Christensen Date: Thu, 26 Aug 2021 12:54:38 -0400 Subject: [PATCH 04/10] Hydra::Math: add an exponential_backoff function --- src/lib/Hydra/Math.pm | 30 ++++++++++++++++++++++++++++++ t/Math.t | 19 +++++++++++++++++++ 2 files changed, 49 insertions(+) create mode 100644 src/lib/Hydra/Math.pm create mode 100644 t/Math.t diff --git a/src/lib/Hydra/Math.pm b/src/lib/Hydra/Math.pm new file mode 100644 index 00000000..40c7f335 --- /dev/null +++ b/src/lib/Hydra/Math.pm @@ -0,0 +1,30 @@ +package Hydra::Math; + +use strict; +use warnings; +use List::Util qw(min); +use Exporter 'import'; +our @EXPORT_OK = qw(exponential_backoff); + +=head2 exponential_backoff + +Calculates a number of seconds to wait before reattempting something. + +Arguments: + +=over 1 + +=item C<$attempts> + +Integer number of attempts made. + +=back + +=cut +sub exponential_backoff { + my ($attempt) = @_; + my $clamp = min(10, $attempt); + return 2 ** $clamp; +} + +1; diff --git a/t/Math.t b/t/Math.t new file mode 100644 index 00000000..a9a49b6f --- /dev/null +++ b/t/Math.t @@ -0,0 +1,19 @@ +use strict; +use warnings; +use Setup; + +use Hydra::Math qw(exponential_backoff); + +use Test2::V0; + +subtest "exponential_backoff" => sub { + is(exponential_backoff(0), 1); + is(exponential_backoff(1), 2); + is(exponential_backoff(2), 4); + is(exponential_backoff(9), 512); + is(exponential_backoff(10), 1024); + is(exponential_backoff(11), 1024, "we're clamped to 1024 seconds"); + is(exponential_backoff(11000), 1024, "we're clamped to 1024 seconds"); +}; + +done_testing; From 147fa4d029a6fcbbb4d63c12812fac66773e8742 Mon Sep 17 00:00:00 2001 From: Graham Christensen Date: Thu, 26 Aug 2021 17:32:32 -0400 Subject: [PATCH 05/10] Result::TaskRetries: Teach about requeue --- src/lib/Hydra/Schema/Result/TaskRetries.pm | 12 +++++++- t/Schema/Result/TaskRetries.t | 35 ++++++++++++++++++++++ 2 files changed, 46 insertions(+), 1 deletion(-) create mode 100644 t/Schema/Result/TaskRetries.t diff --git a/src/lib/Hydra/Schema/Result/TaskRetries.pm b/src/lib/Hydra/Schema/Result/TaskRetries.pm index 08c7e8f6..538252de 100644 --- a/src/lib/Hydra/Schema/Result/TaskRetries.pm +++ b/src/lib/Hydra/Schema/Result/TaskRetries.pm @@ -105,6 +105,16 @@ __PACKAGE__->set_primary_key("id"); # Created by DBIx::Class::Schema::Loader v0.07049 @ 2021-08-26 16:30:59 # DO NOT MODIFY THIS OR ANYTHING ABOVE! md5sum:4MC8UnsgrvJVRrIURvSH5A +use Hydra::Math qw(exponential_backoff); + +sub requeue { + my ($self) = @_; + + $self->update({ + attempts => $self->attempts + 1, + retry_at => time() + exponential_backoff($self->attempts + 1), + }); + +} -# You can replace this text with custom code or comments, and it will be preserved on regeneration 1; diff --git a/t/Schema/Result/TaskRetries.t b/t/Schema/Result/TaskRetries.t new file mode 100644 index 00000000..0425f11c --- /dev/null +++ b/t/Schema/Result/TaskRetries.t @@ -0,0 +1,35 @@ +use strict; +use warnings; +use Setup; + +my %ctx = test_init(); + +require Hydra::Schema; +require Hydra::Model::DB; + +use Test2::V0; + +my $db = Hydra::Model::DB->new; +hydra_setup($db); + +my $taskretries = $db->resultset('TaskRetries'); + +subtest "requeue" => sub { + my $task = $taskretries->create({ + channel => "bogus", + pluginname => "bogus", + payload => "bogus", + attempts => 1, + retry_at => time(), + }); + + $task->requeue(); + is($task->attempts, 2, "We should have stored a second retry"); + is($task->retry_at, within(time() + 4, 2), "Delayed two exponential backoff step"); + + $task->requeue(); + is($task->attempts, 3, "We should have stored a third retry"); + is($task->retry_at, within(time() + 8, 2), "Delayed a third exponential backoff step"); +}; + +done_testing; From d0b0fc21b396defac90e7e983730b92af61a2369 Mon Sep 17 00:00:00 2001 From: Graham Christensen Date: Thu, 26 Aug 2021 17:32:48 -0400 Subject: [PATCH 06/10] ResultSet::TaskRetries: teach about saving tasks --- src/lib/Hydra/Schema/ResultSet/TaskRetries.pm | 29 +++++++++++++++++++ t/Schema/ResultSet/TaskRetries.t | 18 ++++++++++++ 2 files changed, 47 insertions(+) diff --git a/src/lib/Hydra/Schema/ResultSet/TaskRetries.pm b/src/lib/Hydra/Schema/ResultSet/TaskRetries.pm index 86d16b9f..2284c64e 100644 --- a/src/lib/Hydra/Schema/ResultSet/TaskRetries.pm +++ b/src/lib/Hydra/Schema/ResultSet/TaskRetries.pm @@ -5,6 +5,7 @@ use warnings; use utf8; use base 'DBIx::Class::ResultSet'; use List::Util qw(max); +use Hydra::Math qw(exponential_backoff); =head2 get_seconds_to_next_retry @@ -38,4 +39,32 @@ sub get_seconds_to_next_retry { } } +=head2 save_task + +Save a failing L in the database, with a retry scheduled +for a few seconds away. + +Arguments: + +=over 1 + +=item C<$task> + +L The failing task to retry. + +=back + +=cut +sub save_task { + my ($self, $task) = @_; + + return $self->create({ + channel => $task->{"event"}->{"channel_name"}, + pluginname => $task->{"plugin_name"}, + payload => $task->{"event"}->{"payload"}, + attempts => 1, + retry_at => time() + exponential_backoff(1), + }); +} + 1; diff --git a/t/Schema/ResultSet/TaskRetries.t b/t/Schema/ResultSet/TaskRetries.t index 237ef20a..1fcb4776 100644 --- a/t/Schema/ResultSet/TaskRetries.t +++ b/t/Schema/ResultSet/TaskRetries.t @@ -4,6 +4,8 @@ use Setup; my %ctx = test_init(); +use Hydra::Event; +use Hydra::Task; require Hydra::Schema; require Hydra::Model::DB; @@ -42,4 +44,20 @@ subtest "get_seconds_to_next_retry" => sub { } }; +subtest "save_task" => sub { + my $event = Hydra::Event->new_event("build_started", "1"); + my $task = Hydra::Task->new( + $event, + "FooPluginName", + ); + + my $retry = $taskretries->save_task($task); + + is($retry->channel, "build_started", "Channel name should match"); + is($retry->pluginname, "FooPluginName", "Plugin name should match"); + is($retry->payload, "1", "Payload should match"); + is($retry->attempts, 1, "We've had one attempt"); + is($retry->retry_at, within(time() + 1, 2), "The retry at should be approximately one second away"); +}; + done_testing; From b0055a23df0762061daa1749756dde5f298027ae Mon Sep 17 00:00:00 2001 From: Graham Christensen Date: Thu, 26 Aug 2021 17:00:39 -0400 Subject: [PATCH 07/10] TaskDispatcher: Support re-queueing tasks, and dropping tasks after 100 failures. --- src/lib/Hydra/TaskDispatcher.pm | 158 ++++++++++++++++++++++++++++++-- t/TaskDispatcher.t | 100 ++++++++++++++++++++ 2 files changed, 249 insertions(+), 9 deletions(-) diff --git a/src/lib/Hydra/TaskDispatcher.pm b/src/lib/Hydra/TaskDispatcher.pm index 425a89a3..7c472b20 100644 --- a/src/lib/Hydra/TaskDispatcher.pm +++ b/src/lib/Hydra/TaskDispatcher.pm @@ -18,6 +18,32 @@ Its execution model is based on creating a Hydra::Task for each plugin's execution. The task represents the name of the plugin to run and the Event to process. +The dispatcher's behavior is slightly different based on +if the Task has an associated record: + +=over 1 + +=item * +If a task succeeds and there is no record, the Dispatcher +assumes there is no further accounting of the task to be +done. + +=item * +If a task succeeds and there is a record, the Dispatcher +calls C on the record. + +=item * +If a task fails and there is no record, the Dispatcher +calls C<$store_task> with the Task as its only argument. +It is the C<$store_task>'s responsibility to store the +task in some way for retrying. + +=item * +If a task fails and there is a record, the Dispatcher +calls C on the record. + +=back + =cut =head2 new @@ -29,23 +55,27 @@ Arguments: =item C<$dbh> L The database connection. -=back - =item C<$prometheus> L A Promethues implementation, either Prometheus::Tiny or Prometheus::Tiny::Shared. Not compatible with Net::Prometheus. -=back - =item C<%plugins> L A list of Hydra plugins to execute events and tasks against. +=item C<$store_task> (Optional) +A sub to call when storing a task for the first time. This sub is called +after a L's execution fails without an associated record. +The sub is called with the failing task, and is responsible for storing +the task for another attempt. + +If no C<$store_task> sub is provided, all failed events are dropped. + =back =cut sub new { - my ($self, $db, $prometheus, $plugins) = @_; + my ($self, $db, $prometheus, $plugins, $store_task) = @_; $prometheus->declare( "notify_plugin_executions", @@ -67,13 +97,33 @@ sub new { type => "counter", help => "Number of failed executions of this plugin on this channel." ); + $prometheus->declare( + "notify_plugin_retry_success", + type => "counter", + help => "Number of successful executions of retried tasks." + ); + $prometheus->declare( + "notify_plugin_drop", + type => "counter", + help => "Number of tasks that have been dropped after too many retries." + ); + $prometheus->declare( + "notify_plugin_requeue", + type => "counter", + help => "Number of tasks that have been requeued after a failure." + ); my %plugins_by_name = map { ref $_ => $_ } @{$plugins}; + if (!defined($store_task)) { + $store_task = sub {}; + } + my $obj = bless { "db" => $db, "prometheus" => $prometheus, "plugins_by_name" => \%plugins_by_name, + "store_task" => $store_task, }, $self; } @@ -125,10 +175,7 @@ sub dispatch_task { my $channel_name = $task->{"event"}->{'channel_name'}; my $plugin_name = $task->{"plugin_name"}; - my $event_labels = { - channel => $channel_name, - plugin => $plugin_name, - }; + my $event_labels = $self->prom_labels_for_task($task); my $plugin = $self->{"plugins_by_name"}->{$plugin_name}; @@ -146,12 +193,105 @@ sub dispatch_task { $self->{"prometheus"}->histogram_observe("notify_plugin_runtime", tv_interval($start_time), $event_labels); $self->{"prometheus"}->inc("notify_plugin_success", $event_labels); + $self->success($task); 1; } or do { + $self->failure($task); $self->{"prometheus"}->inc("notify_plugin_error", $event_labels); print STDERR "error running $channel_name hooks: $@\n"; return 0; } } +=head2 success + +Mark a task's execution as successful. + +If the task has an associated record, the record is deleted. + +Arguments: + +=over 1 + +=item C<$task> + +L the task to mark as successful. + +=back + +=cut +sub success { + my ($self, $task) = @_; + + my $event_labels = $self->prom_labels_for_task($task); + + if (defined($task->{"record"})) { + $self->{"prometheus"}->inc("notify_plugin_retry_sucess", $event_labels); + $task->{"record"}->delete(); + } +} + +=head2 failure + +Mark a task's execution as failed. + +The task is requeued if it has been attempted fewer than than 100 times. + +Arguments: + +=over 1 + +=item C<$task> + +L the task to mark as successful. + +=back + +=cut +sub failure { + my ($self, $task) = @_; + + my $event_labels = $self->prom_labels_for_task($task); + + if (defined($task->{"record"})) { + if ($task->{"record"}->{"attempts"} > 100) { + $self->{"prometheus"}->inc("notify_plugin_drop", $event_labels); + $task->{"record"}->delete(); + } else { + $self->{"prometheus"}->inc("notify_plugin_requeue", $event_labels); + $task->{"record"}->requeue(); + } + } else { + $self->{"prometheus"}->inc("notify_plugin_requeue", $event_labels); + $self->{"store_task"}($task); + } +} + +=head2 prom_labels_for_task + +Given a specific task, return a hash of standard labels to record with +Prometheus. + +Arguments: + +=over 1 + +=item C<$task> + +L the task to return labels for. + +=back + +=cut +sub prom_labels_for_task { + my ($self, $task) = @_; + + my $channel_name = $task->{"event"}->{'channel_name'}; + my $plugin_name = $task->{"plugin_name"}; + return { + channel => $channel_name, + plugin => $plugin_name, + }; +} + 1; diff --git a/t/TaskDispatcher.t b/t/TaskDispatcher.t index ef3a8579..c055ce45 100644 --- a/t/TaskDispatcher.t +++ b/t/TaskDispatcher.t @@ -60,6 +60,31 @@ sub make_failing_event { return $mock_event; } +sub make_fake_record { + my %attrs = @_; + + my $record = { + "attempts" => $attrs{"attempts"} || 0, + "requeued" => 0, + "deleted" => 0 + }; + + my $mock_record = mock_obj $record => ( + add => [ + "delete" => sub { + my ($self, $db, $plugin) = @_; + $self->{"deleted"} = 1; + }, + "requeue" => sub { + my ($self, $db, $plugin) = @_; + $self->{"requeued"} = 1; + } + ] + ); + + return $mock_record; +} + subtest "dispatch_event" => sub { subtest "every plugin gets called once, even if it fails all of them." => sub { my @plugins = [make_noop_plugin("bogus-1"), make_noop_plugin("bogus-2")]; @@ -104,6 +129,7 @@ subtest "dispatch_task" => sub { "Just bogus-1 should be executed." ); }; + subtest "a task with an invalid plugin is not fatal" => sub { my $bogus_plugin = make_noop_plugin("bogus-1"); my @plugins = [$bogus_plugin, make_noop_plugin("bogus-2")]; @@ -116,6 +142,80 @@ subtest "dispatch_task" => sub { is(@{$event->{"called_with"}}, 0, "No plugins are called"); }; + + subtest "a failed run without a record saves the task for later" => sub { + my $db = "bogus db"; + + my $record = make_fake_record(); + my $bogus_plugin = make_noop_plugin("bogus-1"); + my $task = { + "event" => make_failing_event("fail-event"), + "plugin_name" => ref $bogus_plugin, + "record" => undef, + }; + + my $save_hook_called = 0; + my $dispatcher = Hydra::TaskDispatcher->new($db, $prometheus, [$bogus_plugin], + sub { + $save_hook_called = 1; + } + ); + $dispatcher->dispatch_task($task); + + is($save_hook_called, 1, "The record was requeued with the store hook."); + }; + + subtest "a successful run from a record deletes the record" => sub { + my $db = "bogus db"; + + my $record = make_fake_record(); + my $bogus_plugin = make_noop_plugin("bogus-1"); + my $task = { + "event" => make_fake_event("success-event"), + "plugin_name" => ref $bogus_plugin, + "record" => $record, + }; + + my $dispatcher = Hydra::TaskDispatcher->new($db, $prometheus, [$bogus_plugin]); + $dispatcher->dispatch_task($task); + + is($record->{"deleted"}, 1, "The record was deleted."); + }; + + subtest "a failed run from a record re-queues the task" => sub { + my $db = "bogus db"; + + my $record = make_fake_record(); + my $bogus_plugin = make_noop_plugin("bogus-1"); + my $task = { + "event" => make_failing_event("fail-event"), + "plugin_name" => ref $bogus_plugin, + "record" => $record, + }; + + my $dispatcher = Hydra::TaskDispatcher->new($db, $prometheus, [$bogus_plugin]); + $dispatcher->dispatch_task($task); + + is($record->{"requeued"}, 1, "The record was requeued."); + }; + + subtest "a failed run from a record with a lot of attempts deletes the task" => sub { + my $db = "bogus db"; + + my $record = make_fake_record(attempts => 101); + + my $bogus_plugin = make_noop_plugin("bogus-1"); + my $task = { + "event" => make_failing_event("fail-event"), + "plugin_name" => ref $bogus_plugin, + "record" => $record, + }; + + my $dispatcher = Hydra::TaskDispatcher->new($db, $prometheus, [$bogus_plugin]); + $dispatcher->dispatch_task($task); + + is($record->{"deleted"}, 1, "The record was deleted."); + }; }; From c0e86faa78c9038cbed69604ded96f012fd0d6e7 Mon Sep 17 00:00:00 2001 From: Graham Christensen Date: Thu, 26 Aug 2021 17:08:22 -0400 Subject: [PATCH 08/10] hydra-notify: call TaskRetries->save_task if a task fails --- src/script/hydra-notify | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/src/script/hydra-notify b/src/script/hydra-notify index 42b1978e..de8f64f2 100755 --- a/src/script/hydra-notify +++ b/src/script/hydra-notify @@ -73,7 +73,15 @@ GetOptions( my $db = Hydra::Model::DB->new(); my @plugins = Hydra::Plugin->instantiate(db => $db, config => $config); -my $task_dispatcher = Hydra::TaskDispatcher->new($db, $prom, [@plugins]); +my $task_dispatcher = Hydra::TaskDispatcher->new( + $db, + $prom, + [@plugins], + sub { + my ($task) = @_; + $db->resultset("TaskRetries")->save_task($task); + } +); my $dbh = $db->storage->dbh; From ea3ae0693e1290cdf33eaf6e0948397c617efe65 Mon Sep 17 00:00:00 2001 From: Graham Christensen Date: Thu, 26 Aug 2021 21:53:51 -0400 Subject: [PATCH 09/10] Hook up the retryable tasks with hydra-notify --- src/lib/Hydra/Schema/ResultSet/TaskRetries.pm | 1 + src/lib/Hydra/TaskDispatcher.pm | 2 +- src/script/hydra-notify | 10 +++++++++- 3 files changed, 11 insertions(+), 2 deletions(-) diff --git a/src/lib/Hydra/Schema/ResultSet/TaskRetries.pm b/src/lib/Hydra/Schema/ResultSet/TaskRetries.pm index 2284c64e..10d89cd8 100644 --- a/src/lib/Hydra/Schema/ResultSet/TaskRetries.pm +++ b/src/lib/Hydra/Schema/ResultSet/TaskRetries.pm @@ -6,6 +6,7 @@ use utf8; use base 'DBIx::Class::ResultSet'; use List::Util qw(max); use Hydra::Math qw(exponential_backoff); +use Hydra::Task; =head2 get_seconds_to_next_retry diff --git a/src/lib/Hydra/TaskDispatcher.pm b/src/lib/Hydra/TaskDispatcher.pm index 7c472b20..310fe6d6 100644 --- a/src/lib/Hydra/TaskDispatcher.pm +++ b/src/lib/Hydra/TaskDispatcher.pm @@ -254,7 +254,7 @@ sub failure { my $event_labels = $self->prom_labels_for_task($task); if (defined($task->{"record"})) { - if ($task->{"record"}->{"attempts"} > 100) { + if ($task->{"record"}->attempts > 100) { $self->{"prometheus"}->inc("notify_plugin_drop", $event_labels); $task->{"record"}->delete(); } else { diff --git a/src/script/hydra-notify b/src/script/hydra-notify index de8f64f2..3459cf2c 100755 --- a/src/script/hydra-notify +++ b/src/script/hydra-notify @@ -101,10 +101,12 @@ for my $build ($db->resultset('Builds')->search( $task_dispatcher->dispatch_event($event); } +my $taskretries = $db->resultset('TaskRetries'); + # Process incoming notifications. while (!$queued_only) { $prom->inc("event_loop_iterations"); - my $messages = $listener->block_for_messages(); + my $messages = $listener->block_for_messages($taskretries->get_seconds_to_next_retry()); while (my $message = $messages->()) { $prom->set("event_received", time()); my $channelName = $message->{"channel"}; @@ -128,4 +130,10 @@ while (!$queued_only) { print STDERR "error processing message '$payload' on channel '$channelName': $@\n"; } } + + my $task = $taskretries->getRetryableTask(); + if (defined($task)) { + $task_dispatcher->dispatchTask($task); + } + } From d9df26ac5a5af490871949e6e694048aaf909b27 Mon Sep 17 00:00:00 2001 From: Graham Christensen Date: Fri, 3 Sep 2021 09:13:34 -0400 Subject: [PATCH 10/10] POD: improve wording, punctuation, caps Co-authored-by: Cole Helbling --- src/lib/Hydra/Schema/ResultSet/TaskRetries.pm | 4 ++-- src/lib/Hydra/TaskDispatcher.pm | 12 ++++++------ 2 files changed, 8 insertions(+), 8 deletions(-) diff --git a/src/lib/Hydra/Schema/ResultSet/TaskRetries.pm b/src/lib/Hydra/Schema/ResultSet/TaskRetries.pm index 10d89cd8..526d3f40 100644 --- a/src/lib/Hydra/Schema/ResultSet/TaskRetries.pm +++ b/src/lib/Hydra/Schema/ResultSet/TaskRetries.pm @@ -13,11 +13,11 @@ use Hydra::Task; Query the database to identify how soon the next retryable task is due for being attempted again. -If there are no tasks to be reattempted it returns undef. +If there are no tasks to be reattempted, it returns undef. If a task's scheduled retry has passed, it returns 0. -Otherwise, returns the number of seconds from now to look for work. +Otherwise, returns the number of seconds to wait before looking for more work. =cut sub get_seconds_to_next_retry { diff --git a/src/lib/Hydra/TaskDispatcher.pm b/src/lib/Hydra/TaskDispatcher.pm index 310fe6d6..a45f88ab 100644 --- a/src/lib/Hydra/TaskDispatcher.pm +++ b/src/lib/Hydra/TaskDispatcher.pm @@ -137,7 +137,7 @@ Arguments: =item C<$event> -L the event, usually from L. +L The event, usually from L. =back @@ -154,7 +154,7 @@ sub dispatch_event { =head2 dispatch_task -Execute a specifi plugin against the provided L. +Execute a specific plugin against the provided L. The Task includes information about what plugin should be executed. If the provided plugin does not exist, an error logged is logged and the function returns falsey. @@ -165,7 +165,7 @@ Arguments: =item C<$task> -L the task, usually from L. +L The task, usually from L. =back @@ -215,7 +215,7 @@ Arguments: =item C<$task> -L the task to mark as successful. +L The task to mark as successful. =back @@ -243,7 +243,7 @@ Arguments: =item C<$task> -L the task to mark as successful. +L The task to mark as successful. =back @@ -278,7 +278,7 @@ Arguments: =item C<$task> -L the task to return labels for. +L The task to return labels for. =back