forked from the-distro/ofborg
remove rust-amqp implementations
This has all been converted to the lapin implementations.
This commit is contained in:
parent
fd1b7967b1
commit
45672f8782
|
@ -1,9 +1,3 @@
|
|||
use crate::config::RabbitMQConfig;
|
||||
use crate::ofborg;
|
||||
|
||||
use amqp::Basic;
|
||||
use tracing::info;
|
||||
|
||||
pub struct ConsumeConfig {
|
||||
/// Specifies the name of the queue to consume from.
|
||||
pub queue: String,
|
||||
|
@ -264,39 +258,6 @@ pub struct QueueConfig {
|
|||
pub no_wait: bool,
|
||||
}
|
||||
|
||||
pub fn session_from_config(config: &RabbitMQConfig) -> Result<amqp::Session, amqp::AMQPError> {
|
||||
let scheme = if config.ssl {
|
||||
amqp::AMQPScheme::AMQPS
|
||||
} else {
|
||||
amqp::AMQPScheme::AMQP
|
||||
};
|
||||
|
||||
let mut properties = amqp::Table::new();
|
||||
properties.insert(
|
||||
"ofborg_version".to_owned(),
|
||||
amqp::TableEntry::LongString(ofborg::VERSION.to_owned()),
|
||||
);
|
||||
|
||||
let options = amqp::Options {
|
||||
host: config.host.clone(),
|
||||
port: match scheme {
|
||||
amqp::AMQPScheme::AMQPS => 5671,
|
||||
amqp::AMQPScheme::AMQP => 5672,
|
||||
},
|
||||
vhost: config.virtualhost.clone().unwrap_or_else(|| "/".to_owned()),
|
||||
login: config.username.clone(),
|
||||
password: config.password.clone(),
|
||||
scheme,
|
||||
properties,
|
||||
..amqp::Options::default()
|
||||
};
|
||||
|
||||
let session = amqp::Session::new(options)?;
|
||||
|
||||
info!("Connected to {}", &config.host);
|
||||
Ok(session)
|
||||
}
|
||||
|
||||
pub trait ChannelExt {
|
||||
type Error;
|
||||
fn declare_exchange(&mut self, config: ExchangeConfig) -> Result<(), Self::Error>;
|
||||
|
@ -309,64 +270,3 @@ pub trait ConsumerExt<'a, C> {
|
|||
type Handle;
|
||||
fn consume(self, callback: C, config: ConsumeConfig) -> Result<Self::Handle, Self::Error>;
|
||||
}
|
||||
|
||||
impl ChannelExt for amqp::Channel {
|
||||
type Error = amqp::AMQPError;
|
||||
|
||||
fn declare_exchange(&mut self, config: ExchangeConfig) -> Result<(), Self::Error> {
|
||||
self.exchange_declare(
|
||||
config.exchange,
|
||||
config.exchange_type.into(),
|
||||
config.passive,
|
||||
config.durable,
|
||||
config.auto_delete,
|
||||
config.internal,
|
||||
config.no_wait,
|
||||
amqp::Table::new(),
|
||||
)?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn declare_queue(&mut self, config: QueueConfig) -> Result<(), Self::Error> {
|
||||
self.queue_declare(
|
||||
config.queue,
|
||||
config.passive,
|
||||
config.durable,
|
||||
config.exclusive,
|
||||
config.auto_delete,
|
||||
config.no_wait,
|
||||
amqp::Table::new(),
|
||||
)?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn bind_queue(&mut self, config: BindQueueConfig) -> Result<(), Self::Error> {
|
||||
self.queue_bind(
|
||||
config.queue,
|
||||
config.exchange,
|
||||
config.routing_key.unwrap_or_else(|| "".to_owned()),
|
||||
config.no_wait,
|
||||
amqp::Table::new(),
|
||||
)?;
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
impl<C: amqp::Consumer + 'static> ConsumerExt<'_, C> for amqp::Channel {
|
||||
type Error = amqp::AMQPError;
|
||||
type Handle = Self;
|
||||
|
||||
fn consume(mut self, callback: C, config: ConsumeConfig) -> Result<Self::Handle, Self::Error> {
|
||||
self.basic_consume(
|
||||
callback,
|
||||
config.queue,
|
||||
config.consumer_tag,
|
||||
config.no_local,
|
||||
config.no_ack,
|
||||
config.exclusive,
|
||||
config.no_wait,
|
||||
amqp::Table::new(),
|
||||
)?;
|
||||
Ok(self)
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1,14 +1,5 @@
|
|||
use crate::worker::Action;
|
||||
|
||||
use amqp::protocol::basic::{BasicProperties, Deliver};
|
||||
use amqp::Basic;
|
||||
|
||||
use std::marker::Send;
|
||||
|
||||
pub struct NotifyWorker<T: SimpleNotifyWorker> {
|
||||
internal: T,
|
||||
}
|
||||
|
||||
pub trait SimpleNotifyWorker {
|
||||
type J;
|
||||
|
||||
|
@ -42,75 +33,3 @@ impl NotificationReceiver for DummyNotificationReceiver {
|
|||
self.actions.push(action);
|
||||
}
|
||||
}
|
||||
|
||||
pub struct ChannelNotificationReceiver<'a> {
|
||||
channel: &'a mut amqp::Channel,
|
||||
delivery_tag: u64,
|
||||
}
|
||||
|
||||
impl<'a> ChannelNotificationReceiver<'a> {
|
||||
pub fn new(
|
||||
channel: &'a mut amqp::Channel,
|
||||
delivery_tag: u64,
|
||||
) -> ChannelNotificationReceiver<'a> {
|
||||
ChannelNotificationReceiver {
|
||||
channel,
|
||||
delivery_tag,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<'a> NotificationReceiver for ChannelNotificationReceiver<'a> {
|
||||
fn tell(&mut self, action: Action) {
|
||||
match action {
|
||||
Action::Ack => {
|
||||
self.channel.basic_ack(self.delivery_tag, false).unwrap();
|
||||
}
|
||||
Action::NackRequeue => {
|
||||
self.channel
|
||||
.basic_nack(self.delivery_tag, false, true)
|
||||
.unwrap();
|
||||
}
|
||||
Action::NackDump => {
|
||||
self.channel
|
||||
.basic_nack(self.delivery_tag, false, false)
|
||||
.unwrap();
|
||||
}
|
||||
Action::Publish(mut msg) => {
|
||||
let exch = msg.exchange.take().unwrap_or_else(|| "".to_owned());
|
||||
let key = msg.routing_key.take().unwrap_or_else(|| "".to_owned());
|
||||
|
||||
let props = BasicProperties {
|
||||
content_type: msg.content_type,
|
||||
delivery_mode: Some(2), // persistent
|
||||
..Default::default()
|
||||
};
|
||||
self.channel
|
||||
.basic_publish(exch, key, msg.mandatory, msg.immediate, props, msg.content)
|
||||
.unwrap();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub fn new<T: SimpleNotifyWorker>(worker: T) -> NotifyWorker<T> {
|
||||
NotifyWorker { internal: worker }
|
||||
}
|
||||
|
||||
impl<T: SimpleNotifyWorker + Send> amqp::Consumer for NotifyWorker<T> {
|
||||
fn handle_delivery(
|
||||
&mut self,
|
||||
channel: &mut amqp::Channel,
|
||||
method: Deliver,
|
||||
headers: BasicProperties,
|
||||
body: Vec<u8>,
|
||||
) {
|
||||
let mut receiver = ChannelNotificationReceiver::new(channel, method.delivery_tag);
|
||||
|
||||
let job = self
|
||||
.internal
|
||||
.msg_to_job(&method.routing_key, &headers.content_type, &body)
|
||||
.unwrap();
|
||||
self.internal.consumer(&job, &mut receiver);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1,5 +1,3 @@
|
|||
use amqp::protocol::basic;
|
||||
use amqp::Basic;
|
||||
use async_std::task;
|
||||
use lapin::options::BasicPublishOptions;
|
||||
|
||||
|
@ -26,38 +24,6 @@ pub struct RabbitMQ<C> {
|
|||
channel: C,
|
||||
}
|
||||
|
||||
impl RabbitMQ<amqp::Channel> {
|
||||
pub fn from_amqp(identity: &str, channel: amqp::Channel) -> Self {
|
||||
RabbitMQ {
|
||||
identity: identity.to_owned(),
|
||||
channel,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl SysEvents for RabbitMQ<amqp::Channel> {
|
||||
fn notify(&mut self, event: Event) {
|
||||
let props = basic::BasicProperties {
|
||||
..Default::default()
|
||||
};
|
||||
self.channel
|
||||
.basic_publish(
|
||||
String::from("stats"),
|
||||
"".to_owned(),
|
||||
false,
|
||||
false,
|
||||
props,
|
||||
serde_json::to_string(&EventMessage {
|
||||
sender: self.identity.clone(),
|
||||
events: vec![event],
|
||||
})
|
||||
.unwrap()
|
||||
.into_bytes(),
|
||||
)
|
||||
.unwrap();
|
||||
}
|
||||
}
|
||||
|
||||
impl RabbitMQ<lapin::Channel> {
|
||||
pub fn from_lapin(identity: &str, channel: lapin::Channel) -> Self {
|
||||
RabbitMQ {
|
||||
|
|
|
@ -1,13 +1,6 @@
|
|||
use std::marker::Send;
|
||||
|
||||
use amqp::protocol::basic::{BasicProperties, Deliver};
|
||||
use amqp::Basic;
|
||||
use serde::Serialize;
|
||||
use tracing::error;
|
||||
|
||||
pub struct Worker<T: SimpleWorker> {
|
||||
internal: T,
|
||||
}
|
||||
|
||||
pub struct Response {}
|
||||
|
||||
|
@ -61,58 +54,3 @@ pub trait SimpleWorker: Send {
|
|||
body: &[u8],
|
||||
) -> Result<Self::J, String>;
|
||||
}
|
||||
|
||||
pub fn new<T: SimpleWorker>(worker: T) -> Worker<T> {
|
||||
Worker { internal: worker }
|
||||
}
|
||||
|
||||
impl<T: SimpleWorker + Send> amqp::Consumer for Worker<T> {
|
||||
fn handle_delivery(
|
||||
&mut self,
|
||||
channel: &mut amqp::Channel,
|
||||
method: Deliver,
|
||||
headers: BasicProperties,
|
||||
body: Vec<u8>,
|
||||
) {
|
||||
let job = self
|
||||
.internal
|
||||
.msg_to_job(&method.routing_key, &headers.content_type, &body);
|
||||
|
||||
if let Err(e) = job {
|
||||
error!("Error decoding job: {:?}", e);
|
||||
channel.basic_ack(method.delivery_tag, false).unwrap();
|
||||
return;
|
||||
}
|
||||
|
||||
for action in self.internal.consumer(&job.unwrap()) {
|
||||
match action {
|
||||
Action::Ack => {
|
||||
channel.basic_ack(method.delivery_tag, false).unwrap();
|
||||
}
|
||||
Action::NackRequeue => {
|
||||
channel
|
||||
.basic_nack(method.delivery_tag, false, true)
|
||||
.unwrap();
|
||||
}
|
||||
Action::NackDump => {
|
||||
channel
|
||||
.basic_nack(method.delivery_tag, false, false)
|
||||
.unwrap();
|
||||
}
|
||||
Action::Publish(mut msg) => {
|
||||
let exch = msg.exchange.take().unwrap_or_else(|| "".to_owned());
|
||||
let key = msg.routing_key.take().unwrap_or_else(|| "".to_owned());
|
||||
|
||||
let props = BasicProperties {
|
||||
content_type: msg.content_type,
|
||||
delivery_mode: Some(2), // persistent
|
||||
..Default::default()
|
||||
};
|
||||
channel
|
||||
.basic_publish(exch, key, msg.mandatory, msg.immediate, props, msg.content)
|
||||
.unwrap();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue