diff --git a/ofborg/src/easylapin.rs b/ofborg/src/easylapin.rs index 11c4a66..4cbe852 100644 --- a/ofborg/src/easylapin.rs +++ b/ofborg/src/easylapin.rs @@ -10,8 +10,8 @@ use async_std::future::Future; use async_std::stream::StreamExt; use async_std::task; use lapin::{ - types::AMQPValue, message::Delivery, options::*, types::FieldTable, BasicProperties, Channel, CloseOnDrop, - Connection, ConnectionProperties, ExchangeKind, + message::Delivery, options::*, types::AMQPValue, types::FieldTable, BasicProperties, Channel, + CloseOnDrop, Connection, ConnectionProperties, ExchangeKind, }; pub fn from_config(cfg: &RabbitMQConfig) -> Result, lapin::Error> { @@ -30,8 +30,12 @@ impl ChannelExt for CloseOnDrop { fn declare_exchange(&mut self, config: ExchangeConfig) -> Result<(), Self::Error> { let mut opts = ExchangeDeclareOptions::default(); - // TODO all options + opts.passive = config.passive; opts.durable = config.durable; + opts.auto_delete = config.auto_delete; + opts.internal = config.internal; + opts.nowait = config.no_wait; + let kind = match config.exchange_type { ExchangeType::Topic => ExchangeKind::Topic, ExchangeType::Fanout => ExchangeKind::Fanout, @@ -43,19 +47,25 @@ impl ChannelExt for CloseOnDrop { fn declare_queue(&mut self, config: QueueConfig) -> Result<(), Self::Error> { let mut opts = QueueDeclareOptions::default(); - // TODO all options + opts.passive = config.passive; opts.durable = config.durable; + opts.exclusive = config.exclusive; + opts.auto_delete = config.auto_delete; + opts.nowait = config.no_wait; + task::block_on(self.queue_declare(&config.queue, opts, FieldTable::default()))?; Ok(()) } fn bind_queue(&mut self, config: BindQueueConfig) -> Result<(), Self::Error> { - // TODO all options + let mut opts = QueueBindOptions::default(); + opts.nowait = config.no_wait; + task::block_on(self.queue_bind( &config.queue, &config.exchange, &config.routing_key.unwrap_or_else(|| "".into()), - QueueBindOptions::default(), + opts, FieldTable::default(), ))?; Ok(())