Merge pull request #1011 from DeterminateSystems/retryable-notifications

Retryable notifications
This commit is contained in:
Graham Christensen 2021-09-07 10:04:32 -04:00 committed by GitHub
commit d6aa3f8d15
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
13 changed files with 924 additions and 47 deletions

30
src/lib/Hydra/Math.pm Normal file
View file

@ -0,0 +1,30 @@
package Hydra::Math;
use strict;
use warnings;
use List::Util qw(min);
use Exporter 'import';
our @EXPORT_OK = qw(exponential_backoff);
=head2 exponential_backoff
Calculates a number of seconds to wait before reattempting something.
Arguments:
=over 1
=item C<$attempts>
Integer number of attempts made.
=back
=cut
sub exponential_backoff {
my ($attempt) = @_;
my $clamp = min(10, $attempt);
return 2 ** $clamp;
}
1;

View file

@ -0,0 +1,120 @@
use utf8;
package Hydra::Schema::Result::TaskRetries;
# Created by DBIx::Class::Schema::Loader
# DO NOT MODIFY THE FIRST PART OF THIS FILE
=head1 NAME
Hydra::Schema::Result::TaskRetries
=cut
use strict;
use warnings;
use base 'DBIx::Class::Core';
=head1 COMPONENTS LOADED
=over 4
=item * L<Hydra::Component::ToJSON>
=back
=cut
__PACKAGE__->load_components("+Hydra::Component::ToJSON");
=head1 TABLE: C<taskretries>
=cut
__PACKAGE__->table("taskretries");
=head1 ACCESSORS
=head2 id
data_type: 'integer'
is_auto_increment: 1
is_nullable: 0
sequence: 'taskretries_id_seq'
=head2 channel
data_type: 'text'
is_nullable: 0
=head2 pluginname
data_type: 'text'
is_nullable: 0
=head2 payload
data_type: 'text'
is_nullable: 0
=head2 attempts
data_type: 'integer'
is_nullable: 0
=head2 retry_at
data_type: 'integer'
is_nullable: 0
=cut
__PACKAGE__->add_columns(
"id",
{
data_type => "integer",
is_auto_increment => 1,
is_nullable => 0,
sequence => "taskretries_id_seq",
},
"channel",
{ data_type => "text", is_nullable => 0 },
"pluginname",
{ data_type => "text", is_nullable => 0 },
"payload",
{ data_type => "text", is_nullable => 0 },
"attempts",
{ data_type => "integer", is_nullable => 0 },
"retry_at",
{ data_type => "integer", is_nullable => 0 },
);
=head1 PRIMARY KEY
=over 4
=item * L</id>
=back
=cut
__PACKAGE__->set_primary_key("id");
# Created by DBIx::Class::Schema::Loader v0.07049 @ 2021-08-26 16:30:59
# DO NOT MODIFY THIS OR ANYTHING ABOVE! md5sum:4MC8UnsgrvJVRrIURvSH5A
use Hydra::Math qw(exponential_backoff);
sub requeue {
my ($self) = @_;
$self->update({
attempts => $self->attempts + 1,
retry_at => time() + exponential_backoff($self->attempts + 1),
});
}
1;

View file

@ -0,0 +1,71 @@
package Hydra::Schema::ResultSet::TaskRetries;
use strict;
use warnings;
use utf8;
use base 'DBIx::Class::ResultSet';
use List::Util qw(max);
use Hydra::Math qw(exponential_backoff);
use Hydra::Task;
=head2 get_seconds_to_next_retry
Query the database to identify how soon the next retryable task is due
for being attempted again.
If there are no tasks to be reattempted, it returns undef.
If a task's scheduled retry has passed, it returns 0.
Otherwise, returns the number of seconds to wait before looking for more work.
=cut
sub get_seconds_to_next_retry {
my ($self) = @_;
my $next_retry = $self->search(
{}, # any task
{
order_by => {
-asc => 'retry_at'
},
rows => 1,
}
)->get_column('retry_at')->first;
if (defined($next_retry)) {
return max(0, $next_retry - time());
} else {
return undef;
}
}
=head2 save_task
Save a failing L<Hydra::Task> in the database, with a retry scheduled
for a few seconds away.
Arguments:
=over 1
=item C<$task>
L<Hydra::Task> The failing task to retry.
=back
=cut
sub save_task {
my ($self, $task) = @_;
return $self->create({
channel => $task->{"event"}->{"channel_name"},
pluginname => $task->{"plugin_name"},
payload => $task->{"event"}->{"payload"},
attempts => 1,
retry_at => time() + exponential_backoff(1),
});
}
1;

15
src/lib/Hydra/Task.pm Normal file
View file

@ -0,0 +1,15 @@
package Hydra::Task;
use strict;
use warnings;
sub new {
my ($self, $event, $plugin_name) = @_;
return bless {
"event" => $event,
"plugin_name" => $plugin_name,
}, $self;
}
1;

View file

@ -0,0 +1,297 @@
package Hydra::TaskDispatcher;
use strict;
use warnings;
use Hydra::Task;
use Time::HiRes qw( gettimeofday tv_interval );
=head1 Hydra::TaskDispatcher
Excecute many plugins with Hydra::Event as its input.
The TaskDispatcher is responsible for dealing with fanout
from one incoming Event being executed across many plugins,
or one Event being executed against a single plugin by first
wrapping it in a Task.
Its execution model is based on creating a Hydra::Task for
each plugin's execution. The task represents the name of
the plugin to run and the Event to process.
The dispatcher's behavior is slightly different based on
if the Task has an associated record:
=over 1
=item *
If a task succeeds and there is no record, the Dispatcher
assumes there is no further accounting of the task to be
done.
=item *
If a task succeeds and there is a record, the Dispatcher
calls C<delete> on the record.
=item *
If a task fails and there is no record, the Dispatcher
calls C<$store_task> with the Task as its only argument.
It is the C<$store_task>'s responsibility to store the
task in some way for retrying.
=item *
If a task fails and there is a record, the Dispatcher
calls C<requeue> on the record.
=back
=cut
=head2 new
Arguments:
=over 1
=item C<$dbh>
L<DBI::db> The database connection.
=item C<$prometheus>
L<Prometheus::Tiny> A Promethues implementation, either Prometheus::Tiny
or Prometheus::Tiny::Shared. Not compatible with Net::Prometheus.
=item C<%plugins>
L<Hydra::Plugin> A list of Hydra plugins to execute events and tasks against.
=item C<$store_task> (Optional)
A sub to call when storing a task for the first time. This sub is called
after a L<Hydra::Task>'s execution fails without an associated record.
The sub is called with the failing task, and is responsible for storing
the task for another attempt.
If no C<$store_task> sub is provided, all failed events are dropped.
=back
=cut
sub new {
my ($self, $db, $prometheus, $plugins, $store_task) = @_;
$prometheus->declare(
"notify_plugin_executions",
type => "counter",
help => "Number of times each plugin has been called by channel."
);
$prometheus->declare(
"notify_plugin_runtime",
type => "histogram",
help => "Number of seconds spent executing each plugin by channel."
);
$prometheus->declare(
"notify_plugin_success",
type => "counter",
help => "Number of successful executions of this plugin on this channel."
);
$prometheus->declare(
"notify_plugin_error",
type => "counter",
help => "Number of failed executions of this plugin on this channel."
);
$prometheus->declare(
"notify_plugin_retry_success",
type => "counter",
help => "Number of successful executions of retried tasks."
);
$prometheus->declare(
"notify_plugin_drop",
type => "counter",
help => "Number of tasks that have been dropped after too many retries."
);
$prometheus->declare(
"notify_plugin_requeue",
type => "counter",
help => "Number of tasks that have been requeued after a failure."
);
my %plugins_by_name = map { ref $_ => $_ } @{$plugins};
if (!defined($store_task)) {
$store_task = sub {};
}
my $obj = bless {
"db" => $db,
"prometheus" => $prometheus,
"plugins_by_name" => \%plugins_by_name,
"store_task" => $store_task,
}, $self;
}
=head2 dispatch_event
Execute each configured plugin against the provided L<Hydra::Event>.
Arguments:
=over 1
=item C<$event>
L<Hydra::Event> The event, usually from L<Hydra::PostgresListener>.
=back
=cut
sub dispatch_event {
my ($self, $event) = @_;
foreach my $plugin_name (keys %{$self->{"plugins_by_name"}}) {
my $task = Hydra::Task->new($event, $plugin_name);
$self->dispatch_task($task);
}
}
=head2 dispatch_task
Execute a specific plugin against the provided L<Hydra::Task>.
The Task includes information about what plugin should be executed.
If the provided plugin does not exist, an error logged is logged and the
function returns falsey.
Arguments:
=over 1
=item C<$task>
L<Hydra::Task> The task, usually from L<Hydra::Shema::Result::TaskRetries>.
=back
=cut
sub dispatch_task {
my ($self, $task) = @_;
my $channel_name = $task->{"event"}->{'channel_name'};
my $plugin_name = $task->{"plugin_name"};
my $event_labels = $self->prom_labels_for_task($task);
my $plugin = $self->{"plugins_by_name"}->{$plugin_name};
if (!defined($plugin)) {
$self->{"prometheus"}->inc("notify_plugin_no_such_plugin", $event_labels);
print STDERR "No plugin named $plugin_name\n";
return 0;
}
$self->{"prometheus"}->inc("notify_plugin_executions", $event_labels);
eval {
my $start_time = [gettimeofday()];
$task->{"event"}->execute($self->{"db"}, $plugin);
$self->{"prometheus"}->histogram_observe("notify_plugin_runtime", tv_interval($start_time), $event_labels);
$self->{"prometheus"}->inc("notify_plugin_success", $event_labels);
$self->success($task);
1;
} or do {
$self->failure($task);
$self->{"prometheus"}->inc("notify_plugin_error", $event_labels);
print STDERR "error running $channel_name hooks: $@\n";
return 0;
}
}
=head2 success
Mark a task's execution as successful.
If the task has an associated record, the record is deleted.
Arguments:
=over 1
=item C<$task>
L<Hydra::Task> The task to mark as successful.
=back
=cut
sub success {
my ($self, $task) = @_;
my $event_labels = $self->prom_labels_for_task($task);
if (defined($task->{"record"})) {
$self->{"prometheus"}->inc("notify_plugin_retry_sucess", $event_labels);
$task->{"record"}->delete();
}
}
=head2 failure
Mark a task's execution as failed.
The task is requeued if it has been attempted fewer than than 100 times.
Arguments:
=over 1
=item C<$task>
L<Hydra::Task> The task to mark as successful.
=back
=cut
sub failure {
my ($self, $task) = @_;
my $event_labels = $self->prom_labels_for_task($task);
if (defined($task->{"record"})) {
if ($task->{"record"}->attempts > 100) {
$self->{"prometheus"}->inc("notify_plugin_drop", $event_labels);
$task->{"record"}->delete();
} else {
$self->{"prometheus"}->inc("notify_plugin_requeue", $event_labels);
$task->{"record"}->requeue();
}
} else {
$self->{"prometheus"}->inc("notify_plugin_requeue", $event_labels);
$self->{"store_task"}($task);
}
}
=head2 prom_labels_for_task
Given a specific task, return a hash of standard labels to record with
Prometheus.
Arguments:
=over 1
=item C<$task>
L<Hydra::Task> The task to return labels for.
=back
=cut
sub prom_labels_for_task {
my ($self, $task) = @_;
my $channel_name = $task->{"event"}->{'channel_name'};
my $plugin_name = $task->{"plugin_name"};
return {
channel => $channel_name,
plugin => $plugin_name,
};
}
1;

View file

@ -10,9 +10,9 @@ use Hydra::Helper::AddBuilds;
use Hydra::Helper::Nix; use Hydra::Helper::Nix;
use Hydra::Plugin; use Hydra::Plugin;
use Hydra::PostgresListener; use Hydra::PostgresListener;
use Hydra::TaskDispatcher;
use Parallel::ForkManager; use Parallel::ForkManager;
use Prometheus::Tiny::Shared; use Prometheus::Tiny::Shared;
use Time::HiRes qw( gettimeofday tv_interval );
STDERR->autoflush(1); STDERR->autoflush(1);
STDOUT->autoflush(1); STDOUT->autoflush(1);
@ -25,26 +25,6 @@ my $prom = Prometheus::Tiny::Shared->new;
# Add a new declaration for any new metrics you create. Metrics which are # Add a new declaration for any new metrics you create. Metrics which are
# not pre-declared disappear when their value is null. See: # not pre-declared disappear when their value is null. See:
# https://metacpan.org/pod/Prometheus::Tiny#declare # https://metacpan.org/pod/Prometheus::Tiny#declare
$prom->declare(
"notify_plugin_executions",
type => "counter",
help => "Number of times each plugin has been called by channel."
);
$prom->declare(
"notify_plugin_runtime",
type => "histogram",
help => "Number of seconds spent executing each plugin by channel."
);
$prom->declare(
"notify_plugin_success",
type => "counter",
help => "Number of successful executions of this plugin on this channel."
);
$prom->declare(
"notify_plugin_error",
type => "counter",
help => "Number of failed executions of this plugin on this channel."
);
$prom->declare( $prom->declare(
"event_loop_iterations", "event_loop_iterations",
type => "counter", type => "counter",
@ -93,6 +73,15 @@ GetOptions(
my $db = Hydra::Model::DB->new(); my $db = Hydra::Model::DB->new();
my @plugins = Hydra::Plugin->instantiate(db => $db, config => $config); my @plugins = Hydra::Plugin->instantiate(db => $db, config => $config);
my $task_dispatcher = Hydra::TaskDispatcher->new(
$db,
$prom,
[@plugins],
sub {
my ($task) = @_;
$db->resultset("TaskRetries")->save_task($task);
}
);
my $dbh = $db->storage->dbh; my $dbh = $db->storage->dbh;
@ -102,43 +91,22 @@ $listener->subscribe("build_finished");
$listener->subscribe("step_finished"); $listener->subscribe("step_finished");
$listener->subscribe("hydra_notify_dump_metrics"); $listener->subscribe("hydra_notify_dump_metrics");
sub runPluginsForEvent {
my ($event) = @_;
my $channelName = $event->{'channel_name'};
foreach my $plugin (@plugins) {
$prom->inc("notify_plugin_executions", { channel => $channelName, plugin => ref $plugin });
eval {
my $startTime = [gettimeofday()];
$event->execute($db, $plugin);
$prom->histogram_observe("notify_plugin_runtime", tv_interval($startTime), { channel => $channelName, plugin => ref $plugin });
$prom->inc("notify_plugin_success", { channel => $channelName, plugin => ref $plugin });
1;
} or do {
$prom->inc("notify_plugin_error", { channel => $channelName, plugin => ref $plugin });
print STDERR "error running $event->{'channel_name'} hooks: $@\n";
}
}
}
# Process builds that finished while hydra-notify wasn't running. # Process builds that finished while hydra-notify wasn't running.
for my $build ($db->resultset('Builds')->search( for my $build ($db->resultset('Builds')->search(
{ notificationpendingsince => { '!=', undef } })) { notificationpendingsince => { '!=', undef } }))
{ {
print STDERR "sending notifications for build ${\$build->id}...\n"; print STDERR "sending notifications for build ${\$build->id}...\n";
my $event = Hydra::Event->new_event("build_finished", $build->id);
my $event = Hydra::Event::BuildFinished->new($build->id); $task_dispatcher->dispatch_event($event);
runPluginsForEvent($event);
} }
my $taskretries = $db->resultset('TaskRetries');
# Process incoming notifications. # Process incoming notifications.
while (!$queued_only) { while (!$queued_only) {
$prom->inc("event_loop_iterations"); $prom->inc("event_loop_iterations");
my $messages = $listener->block_for_messages(); my $messages = $listener->block_for_messages($taskretries->get_seconds_to_next_retry());
while (my $message = $messages->()) { while (my $message = $messages->()) {
$prom->set("event_received", time()); $prom->set("event_received", time());
my $channelName = $message->{"channel"}; my $channelName = $message->{"channel"};
@ -154,7 +122,7 @@ while (!$queued_only) {
eval { eval {
my $event = Hydra::Event->new_event($channelName, $message->{"payload"}); my $event = Hydra::Event->new_event($channelName, $message->{"payload"});
runPluginsForEvent($event); $task_dispatcher->dispatch_event($event);
1; 1;
} or do { } or do {
@ -162,4 +130,10 @@ while (!$queued_only) {
print STDERR "error processing message '$payload' on channel '$channelName': $@\n"; print STDERR "error processing message '$payload' on channel '$channelName': $@\n";
} }
} }
my $task = $taskretries->getRetryableTask();
if (defined($task)) {
$task_dispatcher->dispatchTask($task);
}
} }

View file

@ -553,6 +553,21 @@ create table StarredJobs (
foreign key (project, jobset) references Jobsets(project, name) on update cascade on delete cascade foreign key (project, jobset) references Jobsets(project, name) on update cascade on delete cascade
); );
-- Events processed by hydra-notify which have failed at least once
--
-- The payload field contains the original, unparsed payload.
--
-- One row is created for each plugin which fails to process the event,
-- with an increasing retry_at and attempts field.
create table TaskRetries (
id serial primary key not null,
channel text not null,
pluginname text not null,
payload text not null,
attempts integer not null,
retry_at integer not null
);
create index IndexTaskRetriesOrdered on TaskRetries(retry_at asc);
-- The output paths that have permanently failed. -- The output paths that have permanently failed.
create table FailedPaths ( create table FailedPaths (

View file

@ -39,6 +39,7 @@ make_schema_at("Hydra::Schema", {
"starredjobs" => "StarredJobs", "starredjobs" => "StarredJobs",
"systemstatus" => "SystemStatus", "systemstatus" => "SystemStatus",
"systemtypes" => "SystemTypes", "systemtypes" => "SystemTypes",
"taskretries" => "TaskRetries",
"urirevmapper" => "UriRevMapper", "urirevmapper" => "UriRevMapper",
"userroles" => "UserRoles", "userroles" => "UserRoles",
"users" => "Users", "users" => "Users",

15
src/sql/upgrade-77.sql Normal file
View file

@ -0,0 +1,15 @@
-- Events processed by hydra-notify which have failed at least once
--
-- The payload field contains the original, unparsed payload.
--
-- One row is created for each plugin which fails to process the event,
-- with an increasing retry_at and attempts field.
create table TaskRetries (
id serial primary key not null,
channel text not null,
pluginname text not null,
payload text not null,
attempts integer not null,
retry_at integer not null
);
create index IndexTaskRetriesOrdered on TaskRetries(retry_at asc);

19
t/Math.t Normal file
View file

@ -0,0 +1,19 @@
use strict;
use warnings;
use Setup;
use Hydra::Math qw(exponential_backoff);
use Test2::V0;
subtest "exponential_backoff" => sub {
is(exponential_backoff(0), 1);
is(exponential_backoff(1), 2);
is(exponential_backoff(2), 4);
is(exponential_backoff(9), 512);
is(exponential_backoff(10), 1024);
is(exponential_backoff(11), 1024, "we're clamped to 1024 seconds");
is(exponential_backoff(11000), 1024, "we're clamped to 1024 seconds");
};
done_testing;

View file

@ -0,0 +1,35 @@
use strict;
use warnings;
use Setup;
my %ctx = test_init();
require Hydra::Schema;
require Hydra::Model::DB;
use Test2::V0;
my $db = Hydra::Model::DB->new;
hydra_setup($db);
my $taskretries = $db->resultset('TaskRetries');
subtest "requeue" => sub {
my $task = $taskretries->create({
channel => "bogus",
pluginname => "bogus",
payload => "bogus",
attempts => 1,
retry_at => time(),
});
$task->requeue();
is($task->attempts, 2, "We should have stored a second retry");
is($task->retry_at, within(time() + 4, 2), "Delayed two exponential backoff step");
$task->requeue();
is($task->attempts, 3, "We should have stored a third retry");
is($task->retry_at, within(time() + 8, 2), "Delayed a third exponential backoff step");
};
done_testing;

View file

@ -0,0 +1,63 @@
use strict;
use warnings;
use Setup;
my %ctx = test_init();
use Hydra::Event;
use Hydra::Task;
require Hydra::Schema;
require Hydra::Model::DB;
use Test2::V0;
my $db = Hydra::Model::DB->new;
hydra_setup($db);
my $taskretries = $db->resultset('TaskRetries');
subtest "get_seconds_to_next_retry" => sub {
subtest "Without any records in the database" => sub {
is($taskretries->get_seconds_to_next_retry(), undef, "Without any records our next retry moment is forever away.");
};
subtest "With only tasks whose retry timestamps are in the future" => sub {
$taskretries->create({
channel => "bogus",
pluginname => "bogus",
payload => "bogus",
attempts => 1,
retry_at => time() + 100,
});
is($taskretries->get_seconds_to_next_retry(), within(100, 2), "We should retry in roughly 100 seconds");
};
subtest "With tasks whose retry timestamp are in the past" => sub {
$taskretries->create({
channel => "bogus",
pluginname => "bogus",
payload => "bogus",
attempts => 1,
retry_at => time() - 100,
});
is($taskretries->get_seconds_to_next_retry(), 0, "We should retry immediately");
}
};
subtest "save_task" => sub {
my $event = Hydra::Event->new_event("build_started", "1");
my $task = Hydra::Task->new(
$event,
"FooPluginName",
);
my $retry = $taskretries->save_task($task);
is($retry->channel, "build_started", "Channel name should match");
is($retry->pluginname, "FooPluginName", "Plugin name should match");
is($retry->payload, "1", "Payload should match");
is($retry->attempts, 1, "We've had one attempt");
is($retry->retry_at, within(time() + 1, 2), "The retry at should be approximately one second away");
};
done_testing;

222
t/TaskDispatcher.t Normal file
View file

@ -0,0 +1,222 @@
use strict;
use warnings;
use Setup;
use Hydra::TaskDispatcher;
use Prometheus::Tiny::Shared;
use Test2::V0;
use Test2::Tools::Mock qw(mock_obj);
my $db = "bogus db";
my $prometheus = Prometheus::Tiny::Shared->new;
sub make_noop_plugin {
my ($name) = @_;
my $plugin = {
"name" => $name,
};
my $mock_plugin = mock_obj $plugin => ();
return $mock_plugin;
}
sub make_fake_event {
my ($channel_name) = @_;
my $event = {
channel_name => $channel_name,
called_with => [],
};
my $mock_event = mock_obj $event => (
add => [
"execute" => sub {
my ($self, $db, $plugin) = @_;
push @{$self->{"called_with"}}, $plugin;
}
]
);
return $mock_event;
}
sub make_failing_event {
my ($channel_name) = @_;
my $event = {
channel_name => $channel_name,
called_with => [],
};
my $mock_event = mock_obj $event => (
add => [
"execute" => sub {
my ($self, $db, $plugin) = @_;
push @{$self->{"called_with"}}, $plugin;
die "Failing plugin."
}
]
);
return $mock_event;
}
sub make_fake_record {
my %attrs = @_;
my $record = {
"attempts" => $attrs{"attempts"} || 0,
"requeued" => 0,
"deleted" => 0
};
my $mock_record = mock_obj $record => (
add => [
"delete" => sub {
my ($self, $db, $plugin) = @_;
$self->{"deleted"} = 1;
},
"requeue" => sub {
my ($self, $db, $plugin) = @_;
$self->{"requeued"} = 1;
}
]
);
return $mock_record;
}
subtest "dispatch_event" => sub {
subtest "every plugin gets called once, even if it fails all of them." => sub {
my @plugins = [make_noop_plugin("bogus-1"), make_noop_plugin("bogus-2")];
my $dispatcher = Hydra::TaskDispatcher->new($db, $prometheus, @plugins);
my $event = make_failing_event("bogus-channel");
$dispatcher->dispatch_event($event);
is(@{$event->{"called_with"}}, 2, "Both plugins should be called");
my @expected_names = [ "bogus-1", "bogus-2" ];
my @actual_names = sort([
$event->{"called_with"}[0]->name,
$event->{"called_with"}[1]->name
]);
is(
@actual_names,
@expected_names,
"Both plugins should be executed, but not in any particular order."
);
};
};
subtest "dispatch_task" => sub {
subtest "every plugin gets called once" => sub {
my $bogus_plugin = make_noop_plugin("bogus-1");
my @plugins = [$bogus_plugin, make_noop_plugin("bogus-2")];
my $dispatcher = Hydra::TaskDispatcher->new($db, $prometheus, @plugins);
my $event = make_fake_event("bogus-channel");
my $task = Hydra::Task->new($event, ref $bogus_plugin);
is($dispatcher->dispatch_task($task), 1, "Calling dispatch_task returns truthy.");
is(@{$event->{"called_with"}}, 1, "Just one plugin should be called");
is(
$event->{"called_with"}[0]->name,
"bogus-1",
"Just bogus-1 should be executed."
);
};
subtest "a task with an invalid plugin is not fatal" => sub {
my $bogus_plugin = make_noop_plugin("bogus-1");
my @plugins = [$bogus_plugin, make_noop_plugin("bogus-2")];
my $dispatcher = Hydra::TaskDispatcher->new($db, $prometheus, @plugins);
my $event = make_fake_event("bogus-channel");
my $task = Hydra::Task->new($event, "this-plugin-does-not-exist");
is($dispatcher->dispatch_task($task), 0, "Calling dispatch_task returns falsey.");
is(@{$event->{"called_with"}}, 0, "No plugins are called");
};
subtest "a failed run without a record saves the task for later" => sub {
my $db = "bogus db";
my $record = make_fake_record();
my $bogus_plugin = make_noop_plugin("bogus-1");
my $task = {
"event" => make_failing_event("fail-event"),
"plugin_name" => ref $bogus_plugin,
"record" => undef,
};
my $save_hook_called = 0;
my $dispatcher = Hydra::TaskDispatcher->new($db, $prometheus, [$bogus_plugin],
sub {
$save_hook_called = 1;
}
);
$dispatcher->dispatch_task($task);
is($save_hook_called, 1, "The record was requeued with the store hook.");
};
subtest "a successful run from a record deletes the record" => sub {
my $db = "bogus db";
my $record = make_fake_record();
my $bogus_plugin = make_noop_plugin("bogus-1");
my $task = {
"event" => make_fake_event("success-event"),
"plugin_name" => ref $bogus_plugin,
"record" => $record,
};
my $dispatcher = Hydra::TaskDispatcher->new($db, $prometheus, [$bogus_plugin]);
$dispatcher->dispatch_task($task);
is($record->{"deleted"}, 1, "The record was deleted.");
};
subtest "a failed run from a record re-queues the task" => sub {
my $db = "bogus db";
my $record = make_fake_record();
my $bogus_plugin = make_noop_plugin("bogus-1");
my $task = {
"event" => make_failing_event("fail-event"),
"plugin_name" => ref $bogus_plugin,
"record" => $record,
};
my $dispatcher = Hydra::TaskDispatcher->new($db, $prometheus, [$bogus_plugin]);
$dispatcher->dispatch_task($task);
is($record->{"requeued"}, 1, "The record was requeued.");
};
subtest "a failed run from a record with a lot of attempts deletes the task" => sub {
my $db = "bogus db";
my $record = make_fake_record(attempts => 101);
my $bogus_plugin = make_noop_plugin("bogus-1");
my $task = {
"event" => make_failing_event("fail-event"),
"plugin_name" => ref $bogus_plugin,
"record" => $record,
};
my $dispatcher = Hydra::TaskDispatcher->new($db, $prometheus, [$bogus_plugin]);
$dispatcher->dispatch_task($task);
is($record->{"deleted"}, 1, "The record was deleted.");
};
};
done_testing;