diff --git a/src/lib/Hydra/PostgresListener.pm b/src/lib/Hydra/PostgresListener.pm new file mode 100644 index 00000000..0e705d01 --- /dev/null +++ b/src/lib/Hydra/PostgresListener.pm @@ -0,0 +1,105 @@ +package Hydra::PostgresListener; + +use strict; +use warnings; +use IO::Select; + +=head1 Hydra::PostgresListener + +An abstraction around using Postgres' LISTEN / NOTIFY in an event loop. + +=cut + +=head2 new + +Arguments: + +=over 1 + +=item C<$dbh> +L The database connection. + +=back + +=cut + +sub new { + my ($self, $dbh) = @_; + my $sel = IO::Select->new($dbh->func("getfd")); + + return bless { + "dbh" => $dbh, + "sel" => $sel, + }, $self; +} + +=head2 subscribe + +Subscribe to the named channel for messages + +Arguments: + +=over 1 + +=item C<$channel> + +The channel name. + +=back + +=cut + +sub subscribe { + my ($self, $channel) = @_; + $channel = $self->{'dbh'}->quote_identifier($channel); + $self->{'dbh'}->do("listen $channel"); +} + +=head2 block_for_messages + +Wait for messages to arrive within the specified timeout. + +Arguments: + +=over 1 + +=item C<$timeout> +The maximum number of seconds to wait for messages. + +Optional: if unspecified, block forever. + +=back + +Returns: a sub, call the sub repeatedly to get a message. The sub +will return undef when there are no pending messages. + +Example: + + my $events = $listener->block_for_messages(); + while (my $message = $events->()) { + ... + } + +=cut + +sub block_for_messages { + my ($self, $timeout) = @_; + + $self->{'sel'}->can_read($timeout); + + return sub { + my $notify = $self->{'dbh'}->func("pg_notifies"); + if (defined($notify)) { + my ($channelName, $pid, $payload) = @$notify; + return { + channel => $channelName, + pid => $pid, + payload => $payload, + } + } else { + return undef + } + } +} + +1; diff --git a/src/script/hydra-notify b/src/script/hydra-notify index 08233142..22e027ba 100755 --- a/src/script/hydra-notify +++ b/src/script/hydra-notify @@ -3,9 +3,9 @@ use strict; use utf8; use Hydra::Plugin; +use Hydra::PostgresListener; use Hydra::Helper::Nix; use Hydra::Helper::AddBuilds; -use IO::Select; use Getopt::Long; STDERR->autoflush(1); @@ -26,9 +26,10 @@ my @plugins = Hydra::Plugin->instantiate(db => $db, config => $config); my $dbh = $db->storage->dbh; -$dbh->do("listen build_started"); -$dbh->do("listen build_finished"); -$dbh->do("listen step_finished"); +my $listener = Hydra::PostgresListener->new($dbh); +$listener->subscribe("build_started"); +$listener->subscribe("build_finished"); +$listener->subscribe("step_finished"); sub buildStarted { my ($buildId) = @_; @@ -115,15 +116,13 @@ for my $build ($db->resultset('Builds')->search( # Process incoming notifications. -my $fd = $dbh->func("getfd"); -my $sel = IO::Select->new($fd); - while (!$queued_only) { - $sel->can_read; + my $messages = $listener->block_for_messages(); + while (my $message = $messages->()) { - while (my $notify = $dbh->func("pg_notifies")) { - - my ($channelName, $pid, $payload) = @$notify; + my $channelName = $message->{"channel"}; + my $pid = $message->{"pid"}; + my $payload = $message->{"payload"}; #print STDERR "got '$channelName' from $pid: $payload\n"; my @payload = split /\t/, $payload; diff --git a/t/PostgresListener.t b/t/PostgresListener.t new file mode 100644 index 00000000..ac9ec502 --- /dev/null +++ b/t/PostgresListener.t @@ -0,0 +1,79 @@ +use strict; +use Setup; + +my %ctx = test_init(); + +require Hydra::Model::DB; + +use Hydra::PostgresListener; +use Test2::V0; + +my $db = Hydra::Model::DB->new; +my $dbh = $db->storage->dbh; + +my $listener = Hydra::PostgresListener->new($dbh); + +$listener->subscribe("foo"); +$listener->subscribe("bar"); + +is(undef, $listener->block_for_messages(0)->(), "There is no message"); +is(undef, $listener->block_for_messages(0)->(), "There is no message"); +is(undef, $listener->block_for_messages(0)->(), "There is no message"); + +$dbh->do("notify foo, ?", undef, "hi"); +my $event = $listener->block_for_messages(0)->(); +is($event->{'channel'}, "foo", "The channel matches"); +isnt($event->{'pid'}, undef, "The pid is set"); +is($event->{'payload'}, "hi", "The payload matches"); + +is(undef, $listener->block_for_messages(0)->(), "There is no message"); + + +like( + dies { + local $SIG{ALRM} = sub { die "timeout" }; + alarm 1; + $listener->block_for_messages->(); + alarm 0; + }, + qr/timeout/, + "An unspecified block should block forever" +); + +like( + dies { + local $SIG{ALRM} = sub { die "timeout" }; + alarm 1; + $listener->block_for_messages(2)->(); + alarm 0; + }, + qr/timeout/, + "A 2-second block goes longer than 1 second" +); + +ok( + lives { + local $SIG{ALRM} = sub { die "timeout" }; + alarm 2; + is(undef, $listener->block_for_messages(1)->(), "A one second block returns undef data after timeout"); + alarm 0; + }, + "A 1-second block expires within 2 seconds" +); + +subtest "with wacky channel names" => sub { + my $channel = "foo! very weird channel names...; select * from t where 1 = 1"; + my $escapedChannel = $dbh->quote_identifier($channel); + + $listener->subscribe($channel); + + is(undef, $listener->block_for_messages(0)->(), "There is no message"); + + $dbh->do("notify $escapedChannel, ?", undef, "hi"); + my $event = $listener->block_for_messages(0)->(); + is($event->{'channel'}, $channel, "The channel matches"); + isnt($event->{'pid'}, undef, "The pid is set"); + is($event->{'payload'}, "hi", "The payload matches"); +}; + +done_testing;