diff --git a/src/lib/Hydra/Math.pm b/src/lib/Hydra/Math.pm new file mode 100644 index 00000000..40c7f335 --- /dev/null +++ b/src/lib/Hydra/Math.pm @@ -0,0 +1,30 @@ +package Hydra::Math; + +use strict; +use warnings; +use List::Util qw(min); +use Exporter 'import'; +our @EXPORT_OK = qw(exponential_backoff); + +=head2 exponential_backoff + +Calculates a number of seconds to wait before reattempting something. + +Arguments: + +=over 1 + +=item C<$attempts> + +Integer number of attempts made. + +=back + +=cut +sub exponential_backoff { + my ($attempt) = @_; + my $clamp = min(10, $attempt); + return 2 ** $clamp; +} + +1; diff --git a/src/lib/Hydra/Schema/Result/TaskRetries.pm b/src/lib/Hydra/Schema/Result/TaskRetries.pm new file mode 100644 index 00000000..538252de --- /dev/null +++ b/src/lib/Hydra/Schema/Result/TaskRetries.pm @@ -0,0 +1,120 @@ +use utf8; +package Hydra::Schema::Result::TaskRetries; + +# Created by DBIx::Class::Schema::Loader +# DO NOT MODIFY THE FIRST PART OF THIS FILE + +=head1 NAME + +Hydra::Schema::Result::TaskRetries + +=cut + +use strict; +use warnings; + +use base 'DBIx::Class::Core'; + +=head1 COMPONENTS LOADED + +=over 4 + +=item * L + +=back + +=cut + +__PACKAGE__->load_components("+Hydra::Component::ToJSON"); + +=head1 TABLE: C + +=cut + +__PACKAGE__->table("taskretries"); + +=head1 ACCESSORS + +=head2 id + + data_type: 'integer' + is_auto_increment: 1 + is_nullable: 0 + sequence: 'taskretries_id_seq' + +=head2 channel + + data_type: 'text' + is_nullable: 0 + +=head2 pluginname + + data_type: 'text' + is_nullable: 0 + +=head2 payload + + data_type: 'text' + is_nullable: 0 + +=head2 attempts + + data_type: 'integer' + is_nullable: 0 + +=head2 retry_at + + data_type: 'integer' + is_nullable: 0 + +=cut + +__PACKAGE__->add_columns( + "id", + { + data_type => "integer", + is_auto_increment => 1, + is_nullable => 0, + sequence => "taskretries_id_seq", + }, + "channel", + { data_type => "text", is_nullable => 0 }, + "pluginname", + { data_type => "text", is_nullable => 0 }, + "payload", + { data_type => "text", is_nullable => 0 }, + "attempts", + { data_type => "integer", is_nullable => 0 }, + "retry_at", + { data_type => "integer", is_nullable => 0 }, +); + +=head1 PRIMARY KEY + +=over 4 + +=item * L + +=back + +=cut + +__PACKAGE__->set_primary_key("id"); + + +# Created by DBIx::Class::Schema::Loader v0.07049 @ 2021-08-26 16:30:59 +# DO NOT MODIFY THIS OR ANYTHING ABOVE! md5sum:4MC8UnsgrvJVRrIURvSH5A + +use Hydra::Math qw(exponential_backoff); + +sub requeue { + my ($self) = @_; + + $self->update({ + attempts => $self->attempts + 1, + retry_at => time() + exponential_backoff($self->attempts + 1), + }); + +} + +1; diff --git a/src/lib/Hydra/Schema/ResultSet/TaskRetries.pm b/src/lib/Hydra/Schema/ResultSet/TaskRetries.pm new file mode 100644 index 00000000..526d3f40 --- /dev/null +++ b/src/lib/Hydra/Schema/ResultSet/TaskRetries.pm @@ -0,0 +1,71 @@ +package Hydra::Schema::ResultSet::TaskRetries; + +use strict; +use warnings; +use utf8; +use base 'DBIx::Class::ResultSet'; +use List::Util qw(max); +use Hydra::Math qw(exponential_backoff); +use Hydra::Task; + +=head2 get_seconds_to_next_retry + +Query the database to identify how soon the next retryable task is due +for being attempted again. + +If there are no tasks to be reattempted, it returns undef. + +If a task's scheduled retry has passed, it returns 0. + +Otherwise, returns the number of seconds to wait before looking for more work. + +=cut +sub get_seconds_to_next_retry { + my ($self) = @_; + + my $next_retry = $self->search( + {}, # any task + { + order_by => { + -asc => 'retry_at' + }, + rows => 1, + } + )->get_column('retry_at')->first; + + if (defined($next_retry)) { + return max(0, $next_retry - time()); + } else { + return undef; + } +} + +=head2 save_task + +Save a failing L in the database, with a retry scheduled +for a few seconds away. + +Arguments: + +=over 1 + +=item C<$task> + +L The failing task to retry. + +=back + +=cut +sub save_task { + my ($self, $task) = @_; + + return $self->create({ + channel => $task->{"event"}->{"channel_name"}, + pluginname => $task->{"plugin_name"}, + payload => $task->{"event"}->{"payload"}, + attempts => 1, + retry_at => time() + exponential_backoff(1), + }); +} + +1; diff --git a/src/lib/Hydra/Task.pm b/src/lib/Hydra/Task.pm new file mode 100644 index 00000000..31d57d3c --- /dev/null +++ b/src/lib/Hydra/Task.pm @@ -0,0 +1,15 @@ +package Hydra::Task; + +use strict; +use warnings; + +sub new { + my ($self, $event, $plugin_name) = @_; + + return bless { + "event" => $event, + "plugin_name" => $plugin_name, + }, $self; +} + +1; diff --git a/src/lib/Hydra/TaskDispatcher.pm b/src/lib/Hydra/TaskDispatcher.pm new file mode 100644 index 00000000..a45f88ab --- /dev/null +++ b/src/lib/Hydra/TaskDispatcher.pm @@ -0,0 +1,297 @@ +package Hydra::TaskDispatcher; + +use strict; +use warnings; +use Hydra::Task; +use Time::HiRes qw( gettimeofday tv_interval ); + +=head1 Hydra::TaskDispatcher + +Excecute many plugins with Hydra::Event as its input. + +The TaskDispatcher is responsible for dealing with fanout +from one incoming Event being executed across many plugins, +or one Event being executed against a single plugin by first +wrapping it in a Task. + +Its execution model is based on creating a Hydra::Task for +each plugin's execution. The task represents the name of +the plugin to run and the Event to process. + +The dispatcher's behavior is slightly different based on +if the Task has an associated record: + +=over 1 + +=item * +If a task succeeds and there is no record, the Dispatcher +assumes there is no further accounting of the task to be +done. + +=item * +If a task succeeds and there is a record, the Dispatcher +calls C on the record. + +=item * +If a task fails and there is no record, the Dispatcher +calls C<$store_task> with the Task as its only argument. +It is the C<$store_task>'s responsibility to store the +task in some way for retrying. + +=item * +If a task fails and there is a record, the Dispatcher +calls C on the record. + +=back + +=cut + +=head2 new + +Arguments: + +=over 1 + +=item C<$dbh> +L The database connection. + +=item C<$prometheus> +L A Promethues implementation, either Prometheus::Tiny +or Prometheus::Tiny::Shared. Not compatible with Net::Prometheus. + +=item C<%plugins> +L A list of Hydra plugins to execute events and tasks against. + +=item C<$store_task> (Optional) +A sub to call when storing a task for the first time. This sub is called +after a L's execution fails without an associated record. +The sub is called with the failing task, and is responsible for storing +the task for another attempt. + +If no C<$store_task> sub is provided, all failed events are dropped. + +=back + +=cut + +sub new { + my ($self, $db, $prometheus, $plugins, $store_task) = @_; + + $prometheus->declare( + "notify_plugin_executions", + type => "counter", + help => "Number of times each plugin has been called by channel." + ); + $prometheus->declare( + "notify_plugin_runtime", + type => "histogram", + help => "Number of seconds spent executing each plugin by channel." + ); + $prometheus->declare( + "notify_plugin_success", + type => "counter", + help => "Number of successful executions of this plugin on this channel." + ); + $prometheus->declare( + "notify_plugin_error", + type => "counter", + help => "Number of failed executions of this plugin on this channel." + ); + $prometheus->declare( + "notify_plugin_retry_success", + type => "counter", + help => "Number of successful executions of retried tasks." + ); + $prometheus->declare( + "notify_plugin_drop", + type => "counter", + help => "Number of tasks that have been dropped after too many retries." + ); + $prometheus->declare( + "notify_plugin_requeue", + type => "counter", + help => "Number of tasks that have been requeued after a failure." + ); + + my %plugins_by_name = map { ref $_ => $_ } @{$plugins}; + + if (!defined($store_task)) { + $store_task = sub {}; + } + + my $obj = bless { + "db" => $db, + "prometheus" => $prometheus, + "plugins_by_name" => \%plugins_by_name, + "store_task" => $store_task, + }, $self; +} + +=head2 dispatch_event + +Execute each configured plugin against the provided L. + +Arguments: + +=over 1 + +=item C<$event> + +L The event, usually from L. + +=back + +=cut + +sub dispatch_event { + my ($self, $event) = @_; + + foreach my $plugin_name (keys %{$self->{"plugins_by_name"}}) { + my $task = Hydra::Task->new($event, $plugin_name); + $self->dispatch_task($task); + } +} + +=head2 dispatch_task + +Execute a specific plugin against the provided L. +The Task includes information about what plugin should be executed. +If the provided plugin does not exist, an error logged is logged and the +function returns falsey. + +Arguments: + +=over 1 + +=item C<$task> + +L The task, usually from L. + +=back + +=cut +sub dispatch_task { + my ($self, $task) = @_; + + my $channel_name = $task->{"event"}->{'channel_name'}; + my $plugin_name = $task->{"plugin_name"}; + my $event_labels = $self->prom_labels_for_task($task); + + my $plugin = $self->{"plugins_by_name"}->{$plugin_name}; + + if (!defined($plugin)) { + $self->{"prometheus"}->inc("notify_plugin_no_such_plugin", $event_labels); + print STDERR "No plugin named $plugin_name\n"; + return 0; + } + + $self->{"prometheus"}->inc("notify_plugin_executions", $event_labels); + eval { + my $start_time = [gettimeofday()]; + + $task->{"event"}->execute($self->{"db"}, $plugin); + + $self->{"prometheus"}->histogram_observe("notify_plugin_runtime", tv_interval($start_time), $event_labels); + $self->{"prometheus"}->inc("notify_plugin_success", $event_labels); + $self->success($task); + 1; + } or do { + $self->failure($task); + $self->{"prometheus"}->inc("notify_plugin_error", $event_labels); + print STDERR "error running $channel_name hooks: $@\n"; + return 0; + } +} + +=head2 success + +Mark a task's execution as successful. + +If the task has an associated record, the record is deleted. + +Arguments: + +=over 1 + +=item C<$task> + +L The task to mark as successful. + +=back + +=cut +sub success { + my ($self, $task) = @_; + + my $event_labels = $self->prom_labels_for_task($task); + + if (defined($task->{"record"})) { + $self->{"prometheus"}->inc("notify_plugin_retry_sucess", $event_labels); + $task->{"record"}->delete(); + } +} + +=head2 failure + +Mark a task's execution as failed. + +The task is requeued if it has been attempted fewer than than 100 times. + +Arguments: + +=over 1 + +=item C<$task> + +L The task to mark as successful. + +=back + +=cut +sub failure { + my ($self, $task) = @_; + + my $event_labels = $self->prom_labels_for_task($task); + + if (defined($task->{"record"})) { + if ($task->{"record"}->attempts > 100) { + $self->{"prometheus"}->inc("notify_plugin_drop", $event_labels); + $task->{"record"}->delete(); + } else { + $self->{"prometheus"}->inc("notify_plugin_requeue", $event_labels); + $task->{"record"}->requeue(); + } + } else { + $self->{"prometheus"}->inc("notify_plugin_requeue", $event_labels); + $self->{"store_task"}($task); + } +} + +=head2 prom_labels_for_task + +Given a specific task, return a hash of standard labels to record with +Prometheus. + +Arguments: + +=over 1 + +=item C<$task> + +L The task to return labels for. + +=back + +=cut +sub prom_labels_for_task { + my ($self, $task) = @_; + + my $channel_name = $task->{"event"}->{'channel_name'}; + my $plugin_name = $task->{"plugin_name"}; + return { + channel => $channel_name, + plugin => $plugin_name, + }; +} + +1; diff --git a/src/script/hydra-notify b/src/script/hydra-notify index 770f0620..3459cf2c 100755 --- a/src/script/hydra-notify +++ b/src/script/hydra-notify @@ -10,9 +10,9 @@ use Hydra::Helper::AddBuilds; use Hydra::Helper::Nix; use Hydra::Plugin; use Hydra::PostgresListener; +use Hydra::TaskDispatcher; use Parallel::ForkManager; use Prometheus::Tiny::Shared; -use Time::HiRes qw( gettimeofday tv_interval ); STDERR->autoflush(1); STDOUT->autoflush(1); @@ -25,26 +25,6 @@ my $prom = Prometheus::Tiny::Shared->new; # Add a new declaration for any new metrics you create. Metrics which are # not pre-declared disappear when their value is null. See: # https://metacpan.org/pod/Prometheus::Tiny#declare -$prom->declare( - "notify_plugin_executions", - type => "counter", - help => "Number of times each plugin has been called by channel." -); -$prom->declare( - "notify_plugin_runtime", - type => "histogram", - help => "Number of seconds spent executing each plugin by channel." -); -$prom->declare( - "notify_plugin_success", - type => "counter", - help => "Number of successful executions of this plugin on this channel." -); -$prom->declare( - "notify_plugin_error", - type => "counter", - help => "Number of failed executions of this plugin on this channel." -); $prom->declare( "event_loop_iterations", type => "counter", @@ -93,6 +73,15 @@ GetOptions( my $db = Hydra::Model::DB->new(); my @plugins = Hydra::Plugin->instantiate(db => $db, config => $config); +my $task_dispatcher = Hydra::TaskDispatcher->new( + $db, + $prom, + [@plugins], + sub { + my ($task) = @_; + $db->resultset("TaskRetries")->save_task($task); + } +); my $dbh = $db->storage->dbh; @@ -102,43 +91,22 @@ $listener->subscribe("build_finished"); $listener->subscribe("step_finished"); $listener->subscribe("hydra_notify_dump_metrics"); -sub runPluginsForEvent { - my ($event) = @_; - - my $channelName = $event->{'channel_name'}; - - foreach my $plugin (@plugins) { - $prom->inc("notify_plugin_executions", { channel => $channelName, plugin => ref $plugin }); - eval { - my $startTime = [gettimeofday()]; - $event->execute($db, $plugin); - - $prom->histogram_observe("notify_plugin_runtime", tv_interval($startTime), { channel => $channelName, plugin => ref $plugin }); - $prom->inc("notify_plugin_success", { channel => $channelName, plugin => ref $plugin }); - 1; - } or do { - $prom->inc("notify_plugin_error", { channel => $channelName, plugin => ref $plugin }); - print STDERR "error running $event->{'channel_name'} hooks: $@\n"; - } - } -} - # Process builds that finished while hydra-notify wasn't running. for my $build ($db->resultset('Builds')->search( { notificationpendingsince => { '!=', undef } })) { print STDERR "sending notifications for build ${\$build->id}...\n"; - - my $event = Hydra::Event::BuildFinished->new($build->id); - runPluginsForEvent($event); + my $event = Hydra::Event->new_event("build_finished", $build->id); + $task_dispatcher->dispatch_event($event); } +my $taskretries = $db->resultset('TaskRetries'); # Process incoming notifications. while (!$queued_only) { $prom->inc("event_loop_iterations"); - my $messages = $listener->block_for_messages(); + my $messages = $listener->block_for_messages($taskretries->get_seconds_to_next_retry()); while (my $message = $messages->()) { $prom->set("event_received", time()); my $channelName = $message->{"channel"}; @@ -154,7 +122,7 @@ while (!$queued_only) { eval { my $event = Hydra::Event->new_event($channelName, $message->{"payload"}); - runPluginsForEvent($event); + $task_dispatcher->dispatch_event($event); 1; } or do { @@ -162,4 +130,10 @@ while (!$queued_only) { print STDERR "error processing message '$payload' on channel '$channelName': $@\n"; } } + + my $task = $taskretries->getRetryableTask(); + if (defined($task)) { + $task_dispatcher->dispatchTask($task); + } + } diff --git a/src/sql/hydra.sql b/src/sql/hydra.sql index 9f5857c2..0acbc537 100644 --- a/src/sql/hydra.sql +++ b/src/sql/hydra.sql @@ -553,6 +553,21 @@ create table StarredJobs ( foreign key (project, jobset) references Jobsets(project, name) on update cascade on delete cascade ); +-- Events processed by hydra-notify which have failed at least once +-- +-- The payload field contains the original, unparsed payload. +-- +-- One row is created for each plugin which fails to process the event, +-- with an increasing retry_at and attempts field. +create table TaskRetries ( + id serial primary key not null, + channel text not null, + pluginname text not null, + payload text not null, + attempts integer not null, + retry_at integer not null +); +create index IndexTaskRetriesOrdered on TaskRetries(retry_at asc); -- The output paths that have permanently failed. create table FailedPaths ( diff --git a/src/sql/update-dbix.pl b/src/sql/update-dbix.pl index a9a18f0e..97061f21 100644 --- a/src/sql/update-dbix.pl +++ b/src/sql/update-dbix.pl @@ -39,6 +39,7 @@ make_schema_at("Hydra::Schema", { "starredjobs" => "StarredJobs", "systemstatus" => "SystemStatus", "systemtypes" => "SystemTypes", + "taskretries" => "TaskRetries", "urirevmapper" => "UriRevMapper", "userroles" => "UserRoles", "users" => "Users", diff --git a/src/sql/upgrade-77.sql b/src/sql/upgrade-77.sql new file mode 100644 index 00000000..bc67cc9d --- /dev/null +++ b/src/sql/upgrade-77.sql @@ -0,0 +1,15 @@ +-- Events processed by hydra-notify which have failed at least once +-- +-- The payload field contains the original, unparsed payload. +-- +-- One row is created for each plugin which fails to process the event, +-- with an increasing retry_at and attempts field. +create table TaskRetries ( + id serial primary key not null, + channel text not null, + pluginname text not null, + payload text not null, + attempts integer not null, + retry_at integer not null +); +create index IndexTaskRetriesOrdered on TaskRetries(retry_at asc); diff --git a/t/Math.t b/t/Math.t new file mode 100644 index 00000000..a9a49b6f --- /dev/null +++ b/t/Math.t @@ -0,0 +1,19 @@ +use strict; +use warnings; +use Setup; + +use Hydra::Math qw(exponential_backoff); + +use Test2::V0; + +subtest "exponential_backoff" => sub { + is(exponential_backoff(0), 1); + is(exponential_backoff(1), 2); + is(exponential_backoff(2), 4); + is(exponential_backoff(9), 512); + is(exponential_backoff(10), 1024); + is(exponential_backoff(11), 1024, "we're clamped to 1024 seconds"); + is(exponential_backoff(11000), 1024, "we're clamped to 1024 seconds"); +}; + +done_testing; diff --git a/t/Schema/Result/TaskRetries.t b/t/Schema/Result/TaskRetries.t new file mode 100644 index 00000000..0425f11c --- /dev/null +++ b/t/Schema/Result/TaskRetries.t @@ -0,0 +1,35 @@ +use strict; +use warnings; +use Setup; + +my %ctx = test_init(); + +require Hydra::Schema; +require Hydra::Model::DB; + +use Test2::V0; + +my $db = Hydra::Model::DB->new; +hydra_setup($db); + +my $taskretries = $db->resultset('TaskRetries'); + +subtest "requeue" => sub { + my $task = $taskretries->create({ + channel => "bogus", + pluginname => "bogus", + payload => "bogus", + attempts => 1, + retry_at => time(), + }); + + $task->requeue(); + is($task->attempts, 2, "We should have stored a second retry"); + is($task->retry_at, within(time() + 4, 2), "Delayed two exponential backoff step"); + + $task->requeue(); + is($task->attempts, 3, "We should have stored a third retry"); + is($task->retry_at, within(time() + 8, 2), "Delayed a third exponential backoff step"); +}; + +done_testing; diff --git a/t/Schema/ResultSet/TaskRetries.t b/t/Schema/ResultSet/TaskRetries.t new file mode 100644 index 00000000..1fcb4776 --- /dev/null +++ b/t/Schema/ResultSet/TaskRetries.t @@ -0,0 +1,63 @@ +use strict; +use warnings; +use Setup; + +my %ctx = test_init(); + +use Hydra::Event; +use Hydra::Task; +require Hydra::Schema; +require Hydra::Model::DB; + +use Test2::V0; + +my $db = Hydra::Model::DB->new; +hydra_setup($db); + +my $taskretries = $db->resultset('TaskRetries'); + +subtest "get_seconds_to_next_retry" => sub { + subtest "Without any records in the database" => sub { + is($taskretries->get_seconds_to_next_retry(), undef, "Without any records our next retry moment is forever away."); + }; + + subtest "With only tasks whose retry timestamps are in the future" => sub { + $taskretries->create({ + channel => "bogus", + pluginname => "bogus", + payload => "bogus", + attempts => 1, + retry_at => time() + 100, + }); + is($taskretries->get_seconds_to_next_retry(), within(100, 2), "We should retry in roughly 100 seconds"); + }; + + subtest "With tasks whose retry timestamp are in the past" => sub { + $taskretries->create({ + channel => "bogus", + pluginname => "bogus", + payload => "bogus", + attempts => 1, + retry_at => time() - 100, + }); + is($taskretries->get_seconds_to_next_retry(), 0, "We should retry immediately"); + } +}; + +subtest "save_task" => sub { + my $event = Hydra::Event->new_event("build_started", "1"); + my $task = Hydra::Task->new( + $event, + "FooPluginName", + ); + + my $retry = $taskretries->save_task($task); + + is($retry->channel, "build_started", "Channel name should match"); + is($retry->pluginname, "FooPluginName", "Plugin name should match"); + is($retry->payload, "1", "Payload should match"); + is($retry->attempts, 1, "We've had one attempt"); + is($retry->retry_at, within(time() + 1, 2), "The retry at should be approximately one second away"); +}; + +done_testing; diff --git a/t/TaskDispatcher.t b/t/TaskDispatcher.t new file mode 100644 index 00000000..c055ce45 --- /dev/null +++ b/t/TaskDispatcher.t @@ -0,0 +1,222 @@ +use strict; +use warnings; +use Setup; + +use Hydra::TaskDispatcher; +use Prometheus::Tiny::Shared; + +use Test2::V0; +use Test2::Tools::Mock qw(mock_obj); + +my $db = "bogus db"; +my $prometheus = Prometheus::Tiny::Shared->new; + +sub make_noop_plugin { + my ($name) = @_; + my $plugin = { + "name" => $name, + }; + my $mock_plugin = mock_obj $plugin => (); + + return $mock_plugin; +} + +sub make_fake_event { + my ($channel_name) = @_; + + my $event = { + channel_name => $channel_name, + called_with => [], + }; + my $mock_event = mock_obj $event => ( + add => [ + "execute" => sub { + my ($self, $db, $plugin) = @_; + push @{$self->{"called_with"}}, $plugin; + } + ] + ); + + return $mock_event; +} + +sub make_failing_event { + my ($channel_name) = @_; + + my $event = { + channel_name => $channel_name, + called_with => [], + }; + my $mock_event = mock_obj $event => ( + add => [ + "execute" => sub { + my ($self, $db, $plugin) = @_; + push @{$self->{"called_with"}}, $plugin; + die "Failing plugin." + } + ] + ); + + return $mock_event; +} + +sub make_fake_record { + my %attrs = @_; + + my $record = { + "attempts" => $attrs{"attempts"} || 0, + "requeued" => 0, + "deleted" => 0 + }; + + my $mock_record = mock_obj $record => ( + add => [ + "delete" => sub { + my ($self, $db, $plugin) = @_; + $self->{"deleted"} = 1; + }, + "requeue" => sub { + my ($self, $db, $plugin) = @_; + $self->{"requeued"} = 1; + } + ] + ); + + return $mock_record; +} + +subtest "dispatch_event" => sub { + subtest "every plugin gets called once, even if it fails all of them." => sub { + my @plugins = [make_noop_plugin("bogus-1"), make_noop_plugin("bogus-2")]; + + my $dispatcher = Hydra::TaskDispatcher->new($db, $prometheus, @plugins); + + my $event = make_failing_event("bogus-channel"); + $dispatcher->dispatch_event($event); + + is(@{$event->{"called_with"}}, 2, "Both plugins should be called"); + + my @expected_names = [ "bogus-1", "bogus-2" ]; + my @actual_names = sort([ + $event->{"called_with"}[0]->name, + $event->{"called_with"}[1]->name + ]); + + is( + @actual_names, + @expected_names, + "Both plugins should be executed, but not in any particular order." + ); + }; +}; + +subtest "dispatch_task" => sub { + subtest "every plugin gets called once" => sub { + my $bogus_plugin = make_noop_plugin("bogus-1"); + my @plugins = [$bogus_plugin, make_noop_plugin("bogus-2")]; + + my $dispatcher = Hydra::TaskDispatcher->new($db, $prometheus, @plugins); + + my $event = make_fake_event("bogus-channel"); + my $task = Hydra::Task->new($event, ref $bogus_plugin); + is($dispatcher->dispatch_task($task), 1, "Calling dispatch_task returns truthy."); + + is(@{$event->{"called_with"}}, 1, "Just one plugin should be called"); + + is( + $event->{"called_with"}[0]->name, + "bogus-1", + "Just bogus-1 should be executed." + ); + }; + + subtest "a task with an invalid plugin is not fatal" => sub { + my $bogus_plugin = make_noop_plugin("bogus-1"); + my @plugins = [$bogus_plugin, make_noop_plugin("bogus-2")]; + + my $dispatcher = Hydra::TaskDispatcher->new($db, $prometheus, @plugins); + + my $event = make_fake_event("bogus-channel"); + my $task = Hydra::Task->new($event, "this-plugin-does-not-exist"); + is($dispatcher->dispatch_task($task), 0, "Calling dispatch_task returns falsey."); + + is(@{$event->{"called_with"}}, 0, "No plugins are called"); + }; + + subtest "a failed run without a record saves the task for later" => sub { + my $db = "bogus db"; + + my $record = make_fake_record(); + my $bogus_plugin = make_noop_plugin("bogus-1"); + my $task = { + "event" => make_failing_event("fail-event"), + "plugin_name" => ref $bogus_plugin, + "record" => undef, + }; + + my $save_hook_called = 0; + my $dispatcher = Hydra::TaskDispatcher->new($db, $prometheus, [$bogus_plugin], + sub { + $save_hook_called = 1; + } + ); + $dispatcher->dispatch_task($task); + + is($save_hook_called, 1, "The record was requeued with the store hook."); + }; + + subtest "a successful run from a record deletes the record" => sub { + my $db = "bogus db"; + + my $record = make_fake_record(); + my $bogus_plugin = make_noop_plugin("bogus-1"); + my $task = { + "event" => make_fake_event("success-event"), + "plugin_name" => ref $bogus_plugin, + "record" => $record, + }; + + my $dispatcher = Hydra::TaskDispatcher->new($db, $prometheus, [$bogus_plugin]); + $dispatcher->dispatch_task($task); + + is($record->{"deleted"}, 1, "The record was deleted."); + }; + + subtest "a failed run from a record re-queues the task" => sub { + my $db = "bogus db"; + + my $record = make_fake_record(); + my $bogus_plugin = make_noop_plugin("bogus-1"); + my $task = { + "event" => make_failing_event("fail-event"), + "plugin_name" => ref $bogus_plugin, + "record" => $record, + }; + + my $dispatcher = Hydra::TaskDispatcher->new($db, $prometheus, [$bogus_plugin]); + $dispatcher->dispatch_task($task); + + is($record->{"requeued"}, 1, "The record was requeued."); + }; + + subtest "a failed run from a record with a lot of attempts deletes the task" => sub { + my $db = "bogus db"; + + my $record = make_fake_record(attempts => 101); + + my $bogus_plugin = make_noop_plugin("bogus-1"); + my $task = { + "event" => make_failing_event("fail-event"), + "plugin_name" => ref $bogus_plugin, + "record" => $record, + }; + + my $dispatcher = Hydra::TaskDispatcher->new($db, $prometheus, [$bogus_plugin]); + $dispatcher->dispatch_task($task); + + is($record->{"deleted"}, 1, "The record was deleted."); + }; +}; + + +done_testing;