forked from lix-project/hydra
Merge pull request #997 from DeterminateSystems/abstract-listener
Abstract over postgres' LISTEN/NOTIFY
This commit is contained in:
commit
0f958f3425
105
src/lib/Hydra/PostgresListener.pm
Normal file
105
src/lib/Hydra/PostgresListener.pm
Normal file
|
@ -0,0 +1,105 @@
|
||||||
|
package Hydra::PostgresListener;
|
||||||
|
|
||||||
|
use strict;
|
||||||
|
use warnings;
|
||||||
|
use IO::Select;
|
||||||
|
|
||||||
|
=head1 Hydra::PostgresListener
|
||||||
|
|
||||||
|
An abstraction around using Postgres' LISTEN / NOTIFY in an event loop.
|
||||||
|
|
||||||
|
=cut
|
||||||
|
|
||||||
|
=head2 new
|
||||||
|
|
||||||
|
Arguments:
|
||||||
|
|
||||||
|
=over 1
|
||||||
|
|
||||||
|
=item C<$dbh>
|
||||||
|
L<DBI::db> The database connection.
|
||||||
|
|
||||||
|
=back
|
||||||
|
|
||||||
|
=cut
|
||||||
|
|
||||||
|
sub new {
|
||||||
|
my ($self, $dbh) = @_;
|
||||||
|
my $sel = IO::Select->new($dbh->func("getfd"));
|
||||||
|
|
||||||
|
return bless {
|
||||||
|
"dbh" => $dbh,
|
||||||
|
"sel" => $sel,
|
||||||
|
}, $self;
|
||||||
|
}
|
||||||
|
|
||||||
|
=head2 subscribe
|
||||||
|
|
||||||
|
Subscribe to the named channel for messages
|
||||||
|
|
||||||
|
Arguments:
|
||||||
|
|
||||||
|
=over 1
|
||||||
|
|
||||||
|
=item C<$channel>
|
||||||
|
|
||||||
|
The channel name.
|
||||||
|
|
||||||
|
=back
|
||||||
|
|
||||||
|
=cut
|
||||||
|
|
||||||
|
sub subscribe {
|
||||||
|
my ($self, $channel) = @_;
|
||||||
|
$channel = $self->{'dbh'}->quote_identifier($channel);
|
||||||
|
$self->{'dbh'}->do("listen $channel");
|
||||||
|
}
|
||||||
|
|
||||||
|
=head2 block_for_messages
|
||||||
|
|
||||||
|
Wait for messages to arrive within the specified timeout.
|
||||||
|
|
||||||
|
Arguments:
|
||||||
|
|
||||||
|
=over 1
|
||||||
|
|
||||||
|
=item C<$timeout>
|
||||||
|
The maximum number of seconds to wait for messages.
|
||||||
|
|
||||||
|
Optional: if unspecified, block forever.
|
||||||
|
|
||||||
|
=back
|
||||||
|
|
||||||
|
Returns: a sub, call the sub repeatedly to get a message. The sub
|
||||||
|
will return undef when there are no pending messages.
|
||||||
|
|
||||||
|
Example:
|
||||||
|
|
||||||
|
my $events = $listener->block_for_messages();
|
||||||
|
while (my $message = $events->()) {
|
||||||
|
...
|
||||||
|
}
|
||||||
|
|
||||||
|
=cut
|
||||||
|
|
||||||
|
sub block_for_messages {
|
||||||
|
my ($self, $timeout) = @_;
|
||||||
|
|
||||||
|
$self->{'sel'}->can_read($timeout);
|
||||||
|
|
||||||
|
return sub {
|
||||||
|
my $notify = $self->{'dbh'}->func("pg_notifies");
|
||||||
|
if (defined($notify)) {
|
||||||
|
my ($channelName, $pid, $payload) = @$notify;
|
||||||
|
return {
|
||||||
|
channel => $channelName,
|
||||||
|
pid => $pid,
|
||||||
|
payload => $payload,
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
return undef
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
1;
|
|
@ -3,9 +3,9 @@
|
||||||
use strict;
|
use strict;
|
||||||
use utf8;
|
use utf8;
|
||||||
use Hydra::Plugin;
|
use Hydra::Plugin;
|
||||||
|
use Hydra::PostgresListener;
|
||||||
use Hydra::Helper::Nix;
|
use Hydra::Helper::Nix;
|
||||||
use Hydra::Helper::AddBuilds;
|
use Hydra::Helper::AddBuilds;
|
||||||
use IO::Select;
|
|
||||||
use Getopt::Long;
|
use Getopt::Long;
|
||||||
|
|
||||||
STDERR->autoflush(1);
|
STDERR->autoflush(1);
|
||||||
|
@ -26,9 +26,10 @@ my @plugins = Hydra::Plugin->instantiate(db => $db, config => $config);
|
||||||
|
|
||||||
my $dbh = $db->storage->dbh;
|
my $dbh = $db->storage->dbh;
|
||||||
|
|
||||||
$dbh->do("listen build_started");
|
my $listener = Hydra::PostgresListener->new($dbh);
|
||||||
$dbh->do("listen build_finished");
|
$listener->subscribe("build_started");
|
||||||
$dbh->do("listen step_finished");
|
$listener->subscribe("build_finished");
|
||||||
|
$listener->subscribe("step_finished");
|
||||||
|
|
||||||
sub buildStarted {
|
sub buildStarted {
|
||||||
my ($buildId) = @_;
|
my ($buildId) = @_;
|
||||||
|
@ -115,15 +116,13 @@ for my $build ($db->resultset('Builds')->search(
|
||||||
|
|
||||||
|
|
||||||
# Process incoming notifications.
|
# Process incoming notifications.
|
||||||
my $fd = $dbh->func("getfd");
|
|
||||||
my $sel = IO::Select->new($fd);
|
|
||||||
|
|
||||||
while (!$queued_only) {
|
while (!$queued_only) {
|
||||||
$sel->can_read;
|
my $messages = $listener->block_for_messages();
|
||||||
|
while (my $message = $messages->()) {
|
||||||
|
|
||||||
while (my $notify = $dbh->func("pg_notifies")) {
|
my $channelName = $message->{"channel"};
|
||||||
|
my $pid = $message->{"pid"};
|
||||||
my ($channelName, $pid, $payload) = @$notify;
|
my $payload = $message->{"payload"};
|
||||||
#print STDERR "got '$channelName' from $pid: $payload\n";
|
#print STDERR "got '$channelName' from $pid: $payload\n";
|
||||||
|
|
||||||
my @payload = split /\t/, $payload;
|
my @payload = split /\t/, $payload;
|
||||||
|
|
79
t/PostgresListener.t
Normal file
79
t/PostgresListener.t
Normal file
|
@ -0,0 +1,79 @@
|
||||||
|
use strict;
|
||||||
|
use Setup;
|
||||||
|
|
||||||
|
my %ctx = test_init();
|
||||||
|
|
||||||
|
require Hydra::Model::DB;
|
||||||
|
|
||||||
|
use Hydra::PostgresListener;
|
||||||
|
use Test2::V0;
|
||||||
|
|
||||||
|
my $db = Hydra::Model::DB->new;
|
||||||
|
my $dbh = $db->storage->dbh;
|
||||||
|
|
||||||
|
my $listener = Hydra::PostgresListener->new($dbh);
|
||||||
|
|
||||||
|
$listener->subscribe("foo");
|
||||||
|
$listener->subscribe("bar");
|
||||||
|
|
||||||
|
is(undef, $listener->block_for_messages(0)->(), "There is no message");
|
||||||
|
is(undef, $listener->block_for_messages(0)->(), "There is no message");
|
||||||
|
is(undef, $listener->block_for_messages(0)->(), "There is no message");
|
||||||
|
|
||||||
|
$dbh->do("notify foo, ?", undef, "hi");
|
||||||
|
my $event = $listener->block_for_messages(0)->();
|
||||||
|
is($event->{'channel'}, "foo", "The channel matches");
|
||||||
|
isnt($event->{'pid'}, undef, "The pid is set");
|
||||||
|
is($event->{'payload'}, "hi", "The payload matches");
|
||||||
|
|
||||||
|
is(undef, $listener->block_for_messages(0)->(), "There is no message");
|
||||||
|
|
||||||
|
|
||||||
|
like(
|
||||||
|
dies {
|
||||||
|
local $SIG{ALRM} = sub { die "timeout" };
|
||||||
|
alarm 1;
|
||||||
|
$listener->block_for_messages->();
|
||||||
|
alarm 0;
|
||||||
|
},
|
||||||
|
qr/timeout/,
|
||||||
|
"An unspecified block should block forever"
|
||||||
|
);
|
||||||
|
|
||||||
|
like(
|
||||||
|
dies {
|
||||||
|
local $SIG{ALRM} = sub { die "timeout" };
|
||||||
|
alarm 1;
|
||||||
|
$listener->block_for_messages(2)->();
|
||||||
|
alarm 0;
|
||||||
|
},
|
||||||
|
qr/timeout/,
|
||||||
|
"A 2-second block goes longer than 1 second"
|
||||||
|
);
|
||||||
|
|
||||||
|
ok(
|
||||||
|
lives {
|
||||||
|
local $SIG{ALRM} = sub { die "timeout" };
|
||||||
|
alarm 2;
|
||||||
|
is(undef, $listener->block_for_messages(1)->(), "A one second block returns undef data after timeout");
|
||||||
|
alarm 0;
|
||||||
|
},
|
||||||
|
"A 1-second block expires within 2 seconds"
|
||||||
|
);
|
||||||
|
|
||||||
|
subtest "with wacky channel names" => sub {
|
||||||
|
my $channel = "foo! very weird channel names...; select * from t where 1 = 1";
|
||||||
|
my $escapedChannel = $dbh->quote_identifier($channel);
|
||||||
|
|
||||||
|
$listener->subscribe($channel);
|
||||||
|
|
||||||
|
is(undef, $listener->block_for_messages(0)->(), "There is no message");
|
||||||
|
|
||||||
|
$dbh->do("notify $escapedChannel, ?", undef, "hi");
|
||||||
|
my $event = $listener->block_for_messages(0)->();
|
||||||
|
is($event->{'channel'}, $channel, "The channel matches");
|
||||||
|
isnt($event->{'pid'}, undef, "The pid is set");
|
||||||
|
is($event->{'payload'}, "hi", "The payload matches");
|
||||||
|
};
|
||||||
|
|
||||||
|
done_testing;
|
Loading…
Reference in a new issue