TaskDispatcher: Support re-queueing tasks, and dropping tasks after 100 failures.

This commit is contained in:
Graham Christensen 2021-08-26 17:00:39 -04:00
parent d0b0fc21b3
commit b0055a23df
2 changed files with 249 additions and 9 deletions

View file

@ -18,6 +18,32 @@ Its execution model is based on creating a Hydra::Task for
each plugin's execution. The task represents the name of
the plugin to run and the Event to process.
The dispatcher's behavior is slightly different based on
if the Task has an associated record:
=over 1
=item *
If a task succeeds and there is no record, the Dispatcher
assumes there is no further accounting of the task to be
done.
=item *
If a task succeeds and there is a record, the Dispatcher
calls C<delete> on the record.
=item *
If a task fails and there is no record, the Dispatcher
calls C<$store_task> with the Task as its only argument.
It is the C<$store_task>'s responsibility to store the
task in some way for retrying.
=item *
If a task fails and there is a record, the Dispatcher
calls C<requeue> on the record.
=back
=cut
=head2 new
@ -29,23 +55,27 @@ Arguments:
=item C<$dbh>
L<DBI::db> The database connection.
=back
=item C<$prometheus>
L<Prometheus::Tiny> A Promethues implementation, either Prometheus::Tiny
or Prometheus::Tiny::Shared. Not compatible with Net::Prometheus.
=back
=item C<%plugins>
L<Hydra::Plugin> A list of Hydra plugins to execute events and tasks against.
=item C<$store_task> (Optional)
A sub to call when storing a task for the first time. This sub is called
after a L<Hydra::Task>'s execution fails without an associated record.
The sub is called with the failing task, and is responsible for storing
the task for another attempt.
If no C<$store_task> sub is provided, all failed events are dropped.
=back
=cut
sub new {
my ($self, $db, $prometheus, $plugins) = @_;
my ($self, $db, $prometheus, $plugins, $store_task) = @_;
$prometheus->declare(
"notify_plugin_executions",
@ -67,13 +97,33 @@ sub new {
type => "counter",
help => "Number of failed executions of this plugin on this channel."
);
$prometheus->declare(
"notify_plugin_retry_success",
type => "counter",
help => "Number of successful executions of retried tasks."
);
$prometheus->declare(
"notify_plugin_drop",
type => "counter",
help => "Number of tasks that have been dropped after too many retries."
);
$prometheus->declare(
"notify_plugin_requeue",
type => "counter",
help => "Number of tasks that have been requeued after a failure."
);
my %plugins_by_name = map { ref $_ => $_ } @{$plugins};
if (!defined($store_task)) {
$store_task = sub {};
}
my $obj = bless {
"db" => $db,
"prometheus" => $prometheus,
"plugins_by_name" => \%plugins_by_name,
"store_task" => $store_task,
}, $self;
}
@ -125,10 +175,7 @@ sub dispatch_task {
my $channel_name = $task->{"event"}->{'channel_name'};
my $plugin_name = $task->{"plugin_name"};
my $event_labels = {
channel => $channel_name,
plugin => $plugin_name,
};
my $event_labels = $self->prom_labels_for_task($task);
my $plugin = $self->{"plugins_by_name"}->{$plugin_name};
@ -146,12 +193,105 @@ sub dispatch_task {
$self->{"prometheus"}->histogram_observe("notify_plugin_runtime", tv_interval($start_time), $event_labels);
$self->{"prometheus"}->inc("notify_plugin_success", $event_labels);
$self->success($task);
1;
} or do {
$self->failure($task);
$self->{"prometheus"}->inc("notify_plugin_error", $event_labels);
print STDERR "error running $channel_name hooks: $@\n";
return 0;
}
}
=head2 success
Mark a task's execution as successful.
If the task has an associated record, the record is deleted.
Arguments:
=over 1
=item C<$task>
L<Hydra::Task> the task to mark as successful.
=back
=cut
sub success {
my ($self, $task) = @_;
my $event_labels = $self->prom_labels_for_task($task);
if (defined($task->{"record"})) {
$self->{"prometheus"}->inc("notify_plugin_retry_sucess", $event_labels);
$task->{"record"}->delete();
}
}
=head2 failure
Mark a task's execution as failed.
The task is requeued if it has been attempted fewer than than 100 times.
Arguments:
=over 1
=item C<$task>
L<Hydra::Task> the task to mark as successful.
=back
=cut
sub failure {
my ($self, $task) = @_;
my $event_labels = $self->prom_labels_for_task($task);
if (defined($task->{"record"})) {
if ($task->{"record"}->{"attempts"} > 100) {
$self->{"prometheus"}->inc("notify_plugin_drop", $event_labels);
$task->{"record"}->delete();
} else {
$self->{"prometheus"}->inc("notify_plugin_requeue", $event_labels);
$task->{"record"}->requeue();
}
} else {
$self->{"prometheus"}->inc("notify_plugin_requeue", $event_labels);
$self->{"store_task"}($task);
}
}
=head2 prom_labels_for_task
Given a specific task, return a hash of standard labels to record with
Prometheus.
Arguments:
=over 1
=item C<$task>
L<Hydra::Task> the task to return labels for.
=back
=cut
sub prom_labels_for_task {
my ($self, $task) = @_;
my $channel_name = $task->{"event"}->{'channel_name'};
my $plugin_name = $task->{"plugin_name"};
return {
channel => $channel_name,
plugin => $plugin_name,
};
}
1;

View file

@ -60,6 +60,31 @@ sub make_failing_event {
return $mock_event;
}
sub make_fake_record {
my %attrs = @_;
my $record = {
"attempts" => $attrs{"attempts"} || 0,
"requeued" => 0,
"deleted" => 0
};
my $mock_record = mock_obj $record => (
add => [
"delete" => sub {
my ($self, $db, $plugin) = @_;
$self->{"deleted"} = 1;
},
"requeue" => sub {
my ($self, $db, $plugin) = @_;
$self->{"requeued"} = 1;
}
]
);
return $mock_record;
}
subtest "dispatch_event" => sub {
subtest "every plugin gets called once, even if it fails all of them." => sub {
my @plugins = [make_noop_plugin("bogus-1"), make_noop_plugin("bogus-2")];
@ -104,6 +129,7 @@ subtest "dispatch_task" => sub {
"Just bogus-1 should be executed."
);
};
subtest "a task with an invalid plugin is not fatal" => sub {
my $bogus_plugin = make_noop_plugin("bogus-1");
my @plugins = [$bogus_plugin, make_noop_plugin("bogus-2")];
@ -116,6 +142,80 @@ subtest "dispatch_task" => sub {
is(@{$event->{"called_with"}}, 0, "No plugins are called");
};
subtest "a failed run without a record saves the task for later" => sub {
my $db = "bogus db";
my $record = make_fake_record();
my $bogus_plugin = make_noop_plugin("bogus-1");
my $task = {
"event" => make_failing_event("fail-event"),
"plugin_name" => ref $bogus_plugin,
"record" => undef,
};
my $save_hook_called = 0;
my $dispatcher = Hydra::TaskDispatcher->new($db, $prometheus, [$bogus_plugin],
sub {
$save_hook_called = 1;
}
);
$dispatcher->dispatch_task($task);
is($save_hook_called, 1, "The record was requeued with the store hook.");
};
subtest "a successful run from a record deletes the record" => sub {
my $db = "bogus db";
my $record = make_fake_record();
my $bogus_plugin = make_noop_plugin("bogus-1");
my $task = {
"event" => make_fake_event("success-event"),
"plugin_name" => ref $bogus_plugin,
"record" => $record,
};
my $dispatcher = Hydra::TaskDispatcher->new($db, $prometheus, [$bogus_plugin]);
$dispatcher->dispatch_task($task);
is($record->{"deleted"}, 1, "The record was deleted.");
};
subtest "a failed run from a record re-queues the task" => sub {
my $db = "bogus db";
my $record = make_fake_record();
my $bogus_plugin = make_noop_plugin("bogus-1");
my $task = {
"event" => make_failing_event("fail-event"),
"plugin_name" => ref $bogus_plugin,
"record" => $record,
};
my $dispatcher = Hydra::TaskDispatcher->new($db, $prometheus, [$bogus_plugin]);
$dispatcher->dispatch_task($task);
is($record->{"requeued"}, 1, "The record was requeued.");
};
subtest "a failed run from a record with a lot of attempts deletes the task" => sub {
my $db = "bogus db";
my $record = make_fake_record(attempts => 101);
my $bogus_plugin = make_noop_plugin("bogus-1");
my $task = {
"event" => make_failing_event("fail-event"),
"plugin_name" => ref $bogus_plugin,
"record" => $record,
};
my $dispatcher = Hydra::TaskDispatcher->new($db, $prometheus, [$bogus_plugin]);
$dispatcher->dispatch_task($task);
is($record->{"deleted"}, 1, "The record was deleted.");
};
};