From 45672f87822c7851e80ee1c6c8b954d07611dbc7 Mon Sep 17 00:00:00 2001 From: Daiderd Jordan Date: Sun, 24 May 2020 17:22:40 +0200 Subject: [PATCH] remove rust-amqp implementations This has all been converted to the lapin implementations. --- ofborg/src/easyamqp.rs | 100 ------------------------------------- ofborg/src/notifyworker.rs | 81 ------------------------------ ofborg/src/stats.rs | 34 ------------- ofborg/src/worker.rs | 62 ----------------------- 4 files changed, 277 deletions(-) diff --git a/ofborg/src/easyamqp.rs b/ofborg/src/easyamqp.rs index d2b4b13..0df4da6 100644 --- a/ofborg/src/easyamqp.rs +++ b/ofborg/src/easyamqp.rs @@ -1,9 +1,3 @@ -use crate::config::RabbitMQConfig; -use crate::ofborg; - -use amqp::Basic; -use tracing::info; - pub struct ConsumeConfig { /// Specifies the name of the queue to consume from. pub queue: String, @@ -264,39 +258,6 @@ pub struct QueueConfig { pub no_wait: bool, } -pub fn session_from_config(config: &RabbitMQConfig) -> Result { - let scheme = if config.ssl { - amqp::AMQPScheme::AMQPS - } else { - amqp::AMQPScheme::AMQP - }; - - let mut properties = amqp::Table::new(); - properties.insert( - "ofborg_version".to_owned(), - amqp::TableEntry::LongString(ofborg::VERSION.to_owned()), - ); - - let options = amqp::Options { - host: config.host.clone(), - port: match scheme { - amqp::AMQPScheme::AMQPS => 5671, - amqp::AMQPScheme::AMQP => 5672, - }, - vhost: config.virtualhost.clone().unwrap_or_else(|| "/".to_owned()), - login: config.username.clone(), - password: config.password.clone(), - scheme, - properties, - ..amqp::Options::default() - }; - - let session = amqp::Session::new(options)?; - - info!("Connected to {}", &config.host); - Ok(session) -} - pub trait ChannelExt { type Error; fn declare_exchange(&mut self, config: ExchangeConfig) -> Result<(), Self::Error>; @@ -309,64 +270,3 @@ pub trait ConsumerExt<'a, C> { type Handle; fn consume(self, callback: C, config: ConsumeConfig) -> Result; } - -impl ChannelExt for amqp::Channel { - type Error = amqp::AMQPError; - - fn declare_exchange(&mut self, config: ExchangeConfig) -> Result<(), Self::Error> { - self.exchange_declare( - config.exchange, - config.exchange_type.into(), - config.passive, - config.durable, - config.auto_delete, - config.internal, - config.no_wait, - amqp::Table::new(), - )?; - Ok(()) - } - - fn declare_queue(&mut self, config: QueueConfig) -> Result<(), Self::Error> { - self.queue_declare( - config.queue, - config.passive, - config.durable, - config.exclusive, - config.auto_delete, - config.no_wait, - amqp::Table::new(), - )?; - Ok(()) - } - - fn bind_queue(&mut self, config: BindQueueConfig) -> Result<(), Self::Error> { - self.queue_bind( - config.queue, - config.exchange, - config.routing_key.unwrap_or_else(|| "".to_owned()), - config.no_wait, - amqp::Table::new(), - )?; - Ok(()) - } -} - -impl ConsumerExt<'_, C> for amqp::Channel { - type Error = amqp::AMQPError; - type Handle = Self; - - fn consume(mut self, callback: C, config: ConsumeConfig) -> Result { - self.basic_consume( - callback, - config.queue, - config.consumer_tag, - config.no_local, - config.no_ack, - config.exclusive, - config.no_wait, - amqp::Table::new(), - )?; - Ok(self) - } -} diff --git a/ofborg/src/notifyworker.rs b/ofborg/src/notifyworker.rs index 9be12d4..c076342 100644 --- a/ofborg/src/notifyworker.rs +++ b/ofborg/src/notifyworker.rs @@ -1,14 +1,5 @@ use crate::worker::Action; -use amqp::protocol::basic::{BasicProperties, Deliver}; -use amqp::Basic; - -use std::marker::Send; - -pub struct NotifyWorker { - internal: T, -} - pub trait SimpleNotifyWorker { type J; @@ -42,75 +33,3 @@ impl NotificationReceiver for DummyNotificationReceiver { self.actions.push(action); } } - -pub struct ChannelNotificationReceiver<'a> { - channel: &'a mut amqp::Channel, - delivery_tag: u64, -} - -impl<'a> ChannelNotificationReceiver<'a> { - pub fn new( - channel: &'a mut amqp::Channel, - delivery_tag: u64, - ) -> ChannelNotificationReceiver<'a> { - ChannelNotificationReceiver { - channel, - delivery_tag, - } - } -} - -impl<'a> NotificationReceiver for ChannelNotificationReceiver<'a> { - fn tell(&mut self, action: Action) { - match action { - Action::Ack => { - self.channel.basic_ack(self.delivery_tag, false).unwrap(); - } - Action::NackRequeue => { - self.channel - .basic_nack(self.delivery_tag, false, true) - .unwrap(); - } - Action::NackDump => { - self.channel - .basic_nack(self.delivery_tag, false, false) - .unwrap(); - } - Action::Publish(mut msg) => { - let exch = msg.exchange.take().unwrap_or_else(|| "".to_owned()); - let key = msg.routing_key.take().unwrap_or_else(|| "".to_owned()); - - let props = BasicProperties { - content_type: msg.content_type, - delivery_mode: Some(2), // persistent - ..Default::default() - }; - self.channel - .basic_publish(exch, key, msg.mandatory, msg.immediate, props, msg.content) - .unwrap(); - } - } - } -} - -pub fn new(worker: T) -> NotifyWorker { - NotifyWorker { internal: worker } -} - -impl amqp::Consumer for NotifyWorker { - fn handle_delivery( - &mut self, - channel: &mut amqp::Channel, - method: Deliver, - headers: BasicProperties, - body: Vec, - ) { - let mut receiver = ChannelNotificationReceiver::new(channel, method.delivery_tag); - - let job = self - .internal - .msg_to_job(&method.routing_key, &headers.content_type, &body) - .unwrap(); - self.internal.consumer(&job, &mut receiver); - } -} diff --git a/ofborg/src/stats.rs b/ofborg/src/stats.rs index 18ce660..e5363ab 100644 --- a/ofborg/src/stats.rs +++ b/ofborg/src/stats.rs @@ -1,5 +1,3 @@ -use amqp::protocol::basic; -use amqp::Basic; use async_std::task; use lapin::options::BasicPublishOptions; @@ -26,38 +24,6 @@ pub struct RabbitMQ { channel: C, } -impl RabbitMQ { - pub fn from_amqp(identity: &str, channel: amqp::Channel) -> Self { - RabbitMQ { - identity: identity.to_owned(), - channel, - } - } -} - -impl SysEvents for RabbitMQ { - fn notify(&mut self, event: Event) { - let props = basic::BasicProperties { - ..Default::default() - }; - self.channel - .basic_publish( - String::from("stats"), - "".to_owned(), - false, - false, - props, - serde_json::to_string(&EventMessage { - sender: self.identity.clone(), - events: vec![event], - }) - .unwrap() - .into_bytes(), - ) - .unwrap(); - } -} - impl RabbitMQ { pub fn from_lapin(identity: &str, channel: lapin::Channel) -> Self { RabbitMQ { diff --git a/ofborg/src/worker.rs b/ofborg/src/worker.rs index 960917a..bf4f3bc 100644 --- a/ofborg/src/worker.rs +++ b/ofborg/src/worker.rs @@ -1,13 +1,6 @@ use std::marker::Send; -use amqp::protocol::basic::{BasicProperties, Deliver}; -use amqp::Basic; use serde::Serialize; -use tracing::error; - -pub struct Worker { - internal: T, -} pub struct Response {} @@ -61,58 +54,3 @@ pub trait SimpleWorker: Send { body: &[u8], ) -> Result; } - -pub fn new(worker: T) -> Worker { - Worker { internal: worker } -} - -impl amqp::Consumer for Worker { - fn handle_delivery( - &mut self, - channel: &mut amqp::Channel, - method: Deliver, - headers: BasicProperties, - body: Vec, - ) { - let job = self - .internal - .msg_to_job(&method.routing_key, &headers.content_type, &body); - - if let Err(e) = job { - error!("Error decoding job: {:?}", e); - channel.basic_ack(method.delivery_tag, false).unwrap(); - return; - } - - for action in self.internal.consumer(&job.unwrap()) { - match action { - Action::Ack => { - channel.basic_ack(method.delivery_tag, false).unwrap(); - } - Action::NackRequeue => { - channel - .basic_nack(method.delivery_tag, false, true) - .unwrap(); - } - Action::NackDump => { - channel - .basic_nack(method.delivery_tag, false, false) - .unwrap(); - } - Action::Publish(mut msg) => { - let exch = msg.exchange.take().unwrap_or_else(|| "".to_owned()); - let key = msg.routing_key.take().unwrap_or_else(|| "".to_owned()); - - let props = BasicProperties { - content_type: msg.content_type, - delivery_mode: Some(2), // persistent - ..Default::default() - }; - channel - .basic_publish(exch, key, msg.mandatory, msg.immediate, props, msg.content) - .unwrap(); - } - } - } - } -}