make amqp consume generic and split traits

This removes the final library specfic type from the traits.
This commit is contained in:
Daiderd Jordan 2020-04-27 22:39:03 +02:00
parent 98723462a0
commit 5ad80e878b
No known key found for this signature in database
GPG key ID: D02435D05B810C96
8 changed files with 38 additions and 57 deletions

View file

@ -2,7 +2,7 @@ use amqp::Basic;
use log::{info, log, warn}; use log::{info, log, warn};
use ofborg::checkout; use ofborg::checkout;
use ofborg::config; use ofborg::config;
use ofborg::easyamqp::{self, TypedWrappers}; use ofborg::easyamqp::{self, ChannelExt, ConsumerExt};
use ofborg::notifyworker; use ofborg::notifyworker;
use ofborg::tasks; use ofborg::tasks;

View file

@ -1,7 +1,7 @@
use amqp::Basic; use amqp::Basic;
use log::{info, log}; use log::{info, log};
use ofborg::config; use ofborg::config;
use ofborg::easyamqp::{self, TypedWrappers}; use ofborg::easyamqp::{self, ChannelExt, ConsumerExt};
use ofborg::tasks; use ofborg::tasks;
use ofborg::worker; use ofborg::worker;

View file

@ -1,7 +1,7 @@
use amqp::Basic; use amqp::Basic;
use log::{info, log}; use log::{info, log};
use ofborg::config; use ofborg::config;
use ofborg::easyamqp::{self, TypedWrappers}; use ofborg::easyamqp::{self, ChannelExt, ConsumerExt};
use ofborg::tasks; use ofborg::tasks;
use ofborg::worker; use ofborg::worker;

View file

@ -1,7 +1,7 @@
use amqp::Basic; use amqp::Basic;
use log::{info, log}; use log::{info, log};
use ofborg::config; use ofborg::config;
use ofborg::easyamqp::{self, TypedWrappers}; use ofborg::easyamqp::{self, ChannelExt, ConsumerExt};
use ofborg::tasks; use ofborg::tasks;
use ofborg::worker; use ofborg::worker;

View file

@ -1,6 +1,6 @@
use log::{info, log}; use log::{info, log};
use ofborg::config; use ofborg::config;
use ofborg::easyamqp::{self, TypedWrappers}; use ofborg::easyamqp::{self, ChannelExt, ConsumerExt};
use ofborg::tasks; use ofborg::tasks;
use ofborg::worker; use ofborg::worker;

View file

@ -2,7 +2,7 @@ use amqp::Basic;
use log::{error, info, log}; use log::{error, info, log};
use ofborg::checkout; use ofborg::checkout;
use ofborg::config; use ofborg::config;
use ofborg::easyamqp::{self, TypedWrappers}; use ofborg::easyamqp::{self, ChannelExt, ConsumerExt};
use ofborg::stats; use ofborg::stats;
use ofborg::tasks; use ofborg::tasks;
use ofborg::worker; use ofborg::worker;

View file

@ -1,7 +1,7 @@
use amqp::Basic; use amqp::Basic;
use hyper::server::{Request, Response, Server}; use hyper::server::{Request, Response, Server};
use log::{info, log}; use log::{info, log};
use ofborg::easyamqp::TypedWrappers; use ofborg::easyamqp::{ChannelExt, ConsumerExt};
use ofborg::{config, easyamqp, stats, tasks, worker}; use ofborg::{config, easyamqp, stats, tasks, worker};
use std::env; use std::env;

View file

@ -296,53 +296,22 @@ pub fn session_from_config(config: &RabbitMQConfig) -> Result<amqp::Session, amq
Ok(session) Ok(session)
} }
pub trait TypedWrappers { pub trait ChannelExt {
type Error; type Error;
fn declare_exchange(&mut self, config: ExchangeConfig) -> Result<(), Self::Error>;
fn consume<T>(&mut self, callback: T, config: ConsumeConfig) -> Result<(), Self::Error> fn declare_queue(&mut self, config: QueueConfig) -> Result<(), Self::Error>;
where fn bind_queue(&mut self, config: BindQueueConfig) -> Result<(), Self::Error>;
T: amqp::Consumer + 'static;
fn declare_exchange(
&mut self,
config: ExchangeConfig,
) -> Result<(), Self::Error>;
fn declare_queue(
&mut self,
config: QueueConfig,
) -> Result<(), Self::Error>;
fn bind_queue(
&mut self,
config: BindQueueConfig,
) -> Result<(), Self::Error>;
} }
impl TypedWrappers for amqp::Channel { pub trait ConsumerExt<T> {
type Error;
fn consume(&mut self, callback: T, config: ConsumeConfig) -> Result<(), Self::Error>;
}
impl ChannelExt for amqp::Channel {
type Error = amqp::AMQPError; type Error = amqp::AMQPError;
fn consume<T>(&mut self, callback: T, config: ConsumeConfig) -> Result<(), Self::Error> fn declare_exchange(&mut self, config: ExchangeConfig) -> Result<(), Self::Error> {
where
T: amqp::Consumer + 'static,
{
self.basic_consume(
callback,
config.queue,
config.consumer_tag,
config.no_local,
config.no_ack,
config.exclusive,
config.no_wait,
amqp::Table::new(),
)?;
Ok(())
}
fn declare_exchange(
&mut self,
config: ExchangeConfig,
) -> Result<(), Self::Error> {
self.exchange_declare( self.exchange_declare(
config.exchange, config.exchange,
config.exchange_type.into(), config.exchange_type.into(),
@ -356,10 +325,7 @@ impl TypedWrappers for amqp::Channel {
Ok(()) Ok(())
} }
fn declare_queue( fn declare_queue(&mut self, config: QueueConfig) -> Result<(), Self::Error> {
&mut self,
config: QueueConfig,
) -> Result<(), Self::Error> {
self.queue_declare( self.queue_declare(
config.queue, config.queue,
config.passive, config.passive,
@ -372,10 +338,7 @@ impl TypedWrappers for amqp::Channel {
Ok(()) Ok(())
} }
fn bind_queue( fn bind_queue(&mut self, config: BindQueueConfig) -> Result<(), Self::Error> {
&mut self,
config: BindQueueConfig,
) -> Result<(), Self::Error> {
self.queue_bind( self.queue_bind(
config.queue, config.queue,
config.exchange, config.exchange,
@ -386,3 +349,21 @@ impl TypedWrappers for amqp::Channel {
Ok(()) Ok(())
} }
} }
impl<T: amqp::Consumer + 'static> ConsumerExt<T> for amqp::Channel {
type Error = amqp::AMQPError;
fn consume(&mut self, callback: T, config: ConsumeConfig) -> Result<(), Self::Error> {
self.basic_consume(
callback,
config.queue,
config.consumer_tag,
config.no_local,
config.no_ack,
config.exclusive,
config.no_wait,
amqp::Table::new(),
)?;
Ok(())
}
}