From b0055a23df0762061daa1749756dde5f298027ae Mon Sep 17 00:00:00 2001 From: Graham Christensen Date: Thu, 26 Aug 2021 17:00:39 -0400 Subject: [PATCH] TaskDispatcher: Support re-queueing tasks, and dropping tasks after 100 failures. --- src/lib/Hydra/TaskDispatcher.pm | 158 ++++++++++++++++++++++++++++++-- t/TaskDispatcher.t | 100 ++++++++++++++++++++ 2 files changed, 249 insertions(+), 9 deletions(-) diff --git a/src/lib/Hydra/TaskDispatcher.pm b/src/lib/Hydra/TaskDispatcher.pm index 425a89a3..7c472b20 100644 --- a/src/lib/Hydra/TaskDispatcher.pm +++ b/src/lib/Hydra/TaskDispatcher.pm @@ -18,6 +18,32 @@ Its execution model is based on creating a Hydra::Task for each plugin's execution. The task represents the name of the plugin to run and the Event to process. +The dispatcher's behavior is slightly different based on +if the Task has an associated record: + +=over 1 + +=item * +If a task succeeds and there is no record, the Dispatcher +assumes there is no further accounting of the task to be +done. + +=item * +If a task succeeds and there is a record, the Dispatcher +calls C on the record. + +=item * +If a task fails and there is no record, the Dispatcher +calls C<$store_task> with the Task as its only argument. +It is the C<$store_task>'s responsibility to store the +task in some way for retrying. + +=item * +If a task fails and there is a record, the Dispatcher +calls C on the record. + +=back + =cut =head2 new @@ -29,23 +55,27 @@ Arguments: =item C<$dbh> L The database connection. -=back - =item C<$prometheus> L A Promethues implementation, either Prometheus::Tiny or Prometheus::Tiny::Shared. Not compatible with Net::Prometheus. -=back - =item C<%plugins> L A list of Hydra plugins to execute events and tasks against. +=item C<$store_task> (Optional) +A sub to call when storing a task for the first time. This sub is called +after a L's execution fails without an associated record. +The sub is called with the failing task, and is responsible for storing +the task for another attempt. + +If no C<$store_task> sub is provided, all failed events are dropped. + =back =cut sub new { - my ($self, $db, $prometheus, $plugins) = @_; + my ($self, $db, $prometheus, $plugins, $store_task) = @_; $prometheus->declare( "notify_plugin_executions", @@ -67,13 +97,33 @@ sub new { type => "counter", help => "Number of failed executions of this plugin on this channel." ); + $prometheus->declare( + "notify_plugin_retry_success", + type => "counter", + help => "Number of successful executions of retried tasks." + ); + $prometheus->declare( + "notify_plugin_drop", + type => "counter", + help => "Number of tasks that have been dropped after too many retries." + ); + $prometheus->declare( + "notify_plugin_requeue", + type => "counter", + help => "Number of tasks that have been requeued after a failure." + ); my %plugins_by_name = map { ref $_ => $_ } @{$plugins}; + if (!defined($store_task)) { + $store_task = sub {}; + } + my $obj = bless { "db" => $db, "prometheus" => $prometheus, "plugins_by_name" => \%plugins_by_name, + "store_task" => $store_task, }, $self; } @@ -125,10 +175,7 @@ sub dispatch_task { my $channel_name = $task->{"event"}->{'channel_name'}; my $plugin_name = $task->{"plugin_name"}; - my $event_labels = { - channel => $channel_name, - plugin => $plugin_name, - }; + my $event_labels = $self->prom_labels_for_task($task); my $plugin = $self->{"plugins_by_name"}->{$plugin_name}; @@ -146,12 +193,105 @@ sub dispatch_task { $self->{"prometheus"}->histogram_observe("notify_plugin_runtime", tv_interval($start_time), $event_labels); $self->{"prometheus"}->inc("notify_plugin_success", $event_labels); + $self->success($task); 1; } or do { + $self->failure($task); $self->{"prometheus"}->inc("notify_plugin_error", $event_labels); print STDERR "error running $channel_name hooks: $@\n"; return 0; } } +=head2 success + +Mark a task's execution as successful. + +If the task has an associated record, the record is deleted. + +Arguments: + +=over 1 + +=item C<$task> + +L the task to mark as successful. + +=back + +=cut +sub success { + my ($self, $task) = @_; + + my $event_labels = $self->prom_labels_for_task($task); + + if (defined($task->{"record"})) { + $self->{"prometheus"}->inc("notify_plugin_retry_sucess", $event_labels); + $task->{"record"}->delete(); + } +} + +=head2 failure + +Mark a task's execution as failed. + +The task is requeued if it has been attempted fewer than than 100 times. + +Arguments: + +=over 1 + +=item C<$task> + +L the task to mark as successful. + +=back + +=cut +sub failure { + my ($self, $task) = @_; + + my $event_labels = $self->prom_labels_for_task($task); + + if (defined($task->{"record"})) { + if ($task->{"record"}->{"attempts"} > 100) { + $self->{"prometheus"}->inc("notify_plugin_drop", $event_labels); + $task->{"record"}->delete(); + } else { + $self->{"prometheus"}->inc("notify_plugin_requeue", $event_labels); + $task->{"record"}->requeue(); + } + } else { + $self->{"prometheus"}->inc("notify_plugin_requeue", $event_labels); + $self->{"store_task"}($task); + } +} + +=head2 prom_labels_for_task + +Given a specific task, return a hash of standard labels to record with +Prometheus. + +Arguments: + +=over 1 + +=item C<$task> + +L the task to return labels for. + +=back + +=cut +sub prom_labels_for_task { + my ($self, $task) = @_; + + my $channel_name = $task->{"event"}->{'channel_name'}; + my $plugin_name = $task->{"plugin_name"}; + return { + channel => $channel_name, + plugin => $plugin_name, + }; +} + 1; diff --git a/t/TaskDispatcher.t b/t/TaskDispatcher.t index ef3a8579..c055ce45 100644 --- a/t/TaskDispatcher.t +++ b/t/TaskDispatcher.t @@ -60,6 +60,31 @@ sub make_failing_event { return $mock_event; } +sub make_fake_record { + my %attrs = @_; + + my $record = { + "attempts" => $attrs{"attempts"} || 0, + "requeued" => 0, + "deleted" => 0 + }; + + my $mock_record = mock_obj $record => ( + add => [ + "delete" => sub { + my ($self, $db, $plugin) = @_; + $self->{"deleted"} = 1; + }, + "requeue" => sub { + my ($self, $db, $plugin) = @_; + $self->{"requeued"} = 1; + } + ] + ); + + return $mock_record; +} + subtest "dispatch_event" => sub { subtest "every plugin gets called once, even if it fails all of them." => sub { my @plugins = [make_noop_plugin("bogus-1"), make_noop_plugin("bogus-2")]; @@ -104,6 +129,7 @@ subtest "dispatch_task" => sub { "Just bogus-1 should be executed." ); }; + subtest "a task with an invalid plugin is not fatal" => sub { my $bogus_plugin = make_noop_plugin("bogus-1"); my @plugins = [$bogus_plugin, make_noop_plugin("bogus-2")]; @@ -116,6 +142,80 @@ subtest "dispatch_task" => sub { is(@{$event->{"called_with"}}, 0, "No plugins are called"); }; + + subtest "a failed run without a record saves the task for later" => sub { + my $db = "bogus db"; + + my $record = make_fake_record(); + my $bogus_plugin = make_noop_plugin("bogus-1"); + my $task = { + "event" => make_failing_event("fail-event"), + "plugin_name" => ref $bogus_plugin, + "record" => undef, + }; + + my $save_hook_called = 0; + my $dispatcher = Hydra::TaskDispatcher->new($db, $prometheus, [$bogus_plugin], + sub { + $save_hook_called = 1; + } + ); + $dispatcher->dispatch_task($task); + + is($save_hook_called, 1, "The record was requeued with the store hook."); + }; + + subtest "a successful run from a record deletes the record" => sub { + my $db = "bogus db"; + + my $record = make_fake_record(); + my $bogus_plugin = make_noop_plugin("bogus-1"); + my $task = { + "event" => make_fake_event("success-event"), + "plugin_name" => ref $bogus_plugin, + "record" => $record, + }; + + my $dispatcher = Hydra::TaskDispatcher->new($db, $prometheus, [$bogus_plugin]); + $dispatcher->dispatch_task($task); + + is($record->{"deleted"}, 1, "The record was deleted."); + }; + + subtest "a failed run from a record re-queues the task" => sub { + my $db = "bogus db"; + + my $record = make_fake_record(); + my $bogus_plugin = make_noop_plugin("bogus-1"); + my $task = { + "event" => make_failing_event("fail-event"), + "plugin_name" => ref $bogus_plugin, + "record" => $record, + }; + + my $dispatcher = Hydra::TaskDispatcher->new($db, $prometheus, [$bogus_plugin]); + $dispatcher->dispatch_task($task); + + is($record->{"requeued"}, 1, "The record was requeued."); + }; + + subtest "a failed run from a record with a lot of attempts deletes the task" => sub { + my $db = "bogus db"; + + my $record = make_fake_record(attempts => 101); + + my $bogus_plugin = make_noop_plugin("bogus-1"); + my $task = { + "event" => make_failing_event("fail-event"), + "plugin_name" => ref $bogus_plugin, + "record" => $record, + }; + + my $dispatcher = Hydra::TaskDispatcher->new($db, $prometheus, [$bogus_plugin]); + $dispatcher->dispatch_task($task); + + is($record->{"deleted"}, 1, "The record was deleted."); + }; };