diff --git a/ofborg/src/bin/builder.rs b/ofborg/src/bin/builder.rs index 8fff50f..ddf89b5 100644 --- a/ofborg/src/bin/builder.rs +++ b/ofborg/src/bin/builder.rs @@ -2,7 +2,7 @@ use amqp::Basic; use log::{info, log, warn}; use ofborg::checkout; use ofborg::config; -use ofborg::easyamqp::{self, TypedWrappers}; +use ofborg::easyamqp::{self, ChannelExt, ConsumerExt}; use ofborg::notifyworker; use ofborg::tasks; diff --git a/ofborg/src/bin/evaluation-filter.rs b/ofborg/src/bin/evaluation-filter.rs index 823fc9e..e21f885 100644 --- a/ofborg/src/bin/evaluation-filter.rs +++ b/ofborg/src/bin/evaluation-filter.rs @@ -1,7 +1,7 @@ use amqp::Basic; use log::{info, log}; use ofborg::config; -use ofborg::easyamqp::{self, TypedWrappers}; +use ofborg::easyamqp::{self, ChannelExt, ConsumerExt}; use ofborg::tasks; use ofborg::worker; diff --git a/ofborg/src/bin/github-comment-filter.rs b/ofborg/src/bin/github-comment-filter.rs index 2fb2804..3ee0163 100644 --- a/ofborg/src/bin/github-comment-filter.rs +++ b/ofborg/src/bin/github-comment-filter.rs @@ -1,7 +1,7 @@ use amqp::Basic; use log::{info, log}; use ofborg::config; -use ofborg::easyamqp::{self, TypedWrappers}; +use ofborg::easyamqp::{self, ChannelExt, ConsumerExt}; use ofborg::tasks; use ofborg::worker; diff --git a/ofborg/src/bin/github-comment-poster.rs b/ofborg/src/bin/github-comment-poster.rs index 45bca60..7a723b1 100644 --- a/ofborg/src/bin/github-comment-poster.rs +++ b/ofborg/src/bin/github-comment-poster.rs @@ -1,7 +1,7 @@ use amqp::Basic; use log::{info, log}; use ofborg::config; -use ofborg::easyamqp::{self, TypedWrappers}; +use ofborg::easyamqp::{self, ChannelExt, ConsumerExt}; use ofborg::tasks; use ofborg::worker; diff --git a/ofborg/src/bin/log-message-collector.rs b/ofborg/src/bin/log-message-collector.rs index 4ff5958..f7513b9 100644 --- a/ofborg/src/bin/log-message-collector.rs +++ b/ofborg/src/bin/log-message-collector.rs @@ -1,6 +1,6 @@ use log::{info, log}; use ofborg::config; -use ofborg::easyamqp::{self, TypedWrappers}; +use ofborg::easyamqp::{self, ChannelExt, ConsumerExt}; use ofborg::tasks; use ofborg::worker; diff --git a/ofborg/src/bin/mass-rebuilder.rs b/ofborg/src/bin/mass-rebuilder.rs index 7b14ea6..eb310db 100644 --- a/ofborg/src/bin/mass-rebuilder.rs +++ b/ofborg/src/bin/mass-rebuilder.rs @@ -2,7 +2,7 @@ use amqp::Basic; use log::{error, info, log}; use ofborg::checkout; use ofborg::config; -use ofborg::easyamqp::{self, TypedWrappers}; +use ofborg::easyamqp::{self, ChannelExt, ConsumerExt}; use ofborg::stats; use ofborg::tasks; use ofborg::worker; diff --git a/ofborg/src/bin/stats.rs b/ofborg/src/bin/stats.rs index 273c08e..3f147ad 100644 --- a/ofborg/src/bin/stats.rs +++ b/ofborg/src/bin/stats.rs @@ -1,7 +1,7 @@ use amqp::Basic; use hyper::server::{Request, Response, Server}; use log::{info, log}; -use ofborg::easyamqp::TypedWrappers; +use ofborg::easyamqp::{ChannelExt, ConsumerExt}; use ofborg::{config, easyamqp, stats, tasks, worker}; use std::env; diff --git a/ofborg/src/easyamqp.rs b/ofborg/src/easyamqp.rs index b059a82..d80eeec 100644 --- a/ofborg/src/easyamqp.rs +++ b/ofborg/src/easyamqp.rs @@ -296,53 +296,22 @@ pub fn session_from_config(config: &RabbitMQConfig) -> Result(&mut self, callback: T, config: ConsumeConfig) -> Result<(), Self::Error> - where - T: amqp::Consumer + 'static; - - fn declare_exchange( - &mut self, - config: ExchangeConfig, - ) -> Result<(), Self::Error>; - - fn declare_queue( - &mut self, - config: QueueConfig, - ) -> Result<(), Self::Error>; - - fn bind_queue( - &mut self, - config: BindQueueConfig, - ) -> Result<(), Self::Error>; + fn declare_exchange(&mut self, config: ExchangeConfig) -> Result<(), Self::Error>; + fn declare_queue(&mut self, config: QueueConfig) -> Result<(), Self::Error>; + fn bind_queue(&mut self, config: BindQueueConfig) -> Result<(), Self::Error>; } -impl TypedWrappers for amqp::Channel { +pub trait ConsumerExt { + type Error; + fn consume(&mut self, callback: T, config: ConsumeConfig) -> Result<(), Self::Error>; +} + +impl ChannelExt for amqp::Channel { type Error = amqp::AMQPError; - fn consume(&mut self, callback: T, config: ConsumeConfig) -> Result<(), Self::Error> - where - T: amqp::Consumer + 'static, - { - self.basic_consume( - callback, - config.queue, - config.consumer_tag, - config.no_local, - config.no_ack, - config.exclusive, - config.no_wait, - amqp::Table::new(), - )?; - Ok(()) - } - - fn declare_exchange( - &mut self, - config: ExchangeConfig, - ) -> Result<(), Self::Error> { + fn declare_exchange(&mut self, config: ExchangeConfig) -> Result<(), Self::Error> { self.exchange_declare( config.exchange, config.exchange_type.into(), @@ -356,10 +325,7 @@ impl TypedWrappers for amqp::Channel { Ok(()) } - fn declare_queue( - &mut self, - config: QueueConfig, - ) -> Result<(), Self::Error> { + fn declare_queue(&mut self, config: QueueConfig) -> Result<(), Self::Error> { self.queue_declare( config.queue, config.passive, @@ -372,10 +338,7 @@ impl TypedWrappers for amqp::Channel { Ok(()) } - fn bind_queue( - &mut self, - config: BindQueueConfig, - ) -> Result<(), Self::Error> { + fn bind_queue(&mut self, config: BindQueueConfig) -> Result<(), Self::Error> { self.queue_bind( config.queue, config.exchange, @@ -386,3 +349,21 @@ impl TypedWrappers for amqp::Channel { Ok(()) } } + +impl ConsumerExt for amqp::Channel { + type Error = amqp::AMQPError; + + fn consume(&mut self, callback: T, config: ConsumeConfig) -> Result<(), Self::Error> { + self.basic_consume( + callback, + config.queue, + config.consumer_tag, + config.no_local, + config.no_ack, + config.exclusive, + config.no_wait, + amqp::Table::new(), + )?; + Ok(()) + } +}