implement all declare options for lapin

Many of these are actually not used, probably better to remove them.
This commit is contained in:
Daiderd Jordan 2020-04-29 22:08:23 +02:00
parent 737871ea84
commit 7dc8d407a2
No known key found for this signature in database
GPG key ID: D02435D05B810C96

View file

@ -10,8 +10,8 @@ use async_std::future::Future;
use async_std::stream::StreamExt; use async_std::stream::StreamExt;
use async_std::task; use async_std::task;
use lapin::{ use lapin::{
types::AMQPValue, message::Delivery, options::*, types::FieldTable, BasicProperties, Channel, CloseOnDrop, message::Delivery, options::*, types::AMQPValue, types::FieldTable, BasicProperties, Channel,
Connection, ConnectionProperties, ExchangeKind, CloseOnDrop, Connection, ConnectionProperties, ExchangeKind,
}; };
pub fn from_config(cfg: &RabbitMQConfig) -> Result<CloseOnDrop<Connection>, lapin::Error> { pub fn from_config(cfg: &RabbitMQConfig) -> Result<CloseOnDrop<Connection>, lapin::Error> {
@ -30,8 +30,12 @@ impl ChannelExt for CloseOnDrop<Channel> {
fn declare_exchange(&mut self, config: ExchangeConfig) -> Result<(), Self::Error> { fn declare_exchange(&mut self, config: ExchangeConfig) -> Result<(), Self::Error> {
let mut opts = ExchangeDeclareOptions::default(); let mut opts = ExchangeDeclareOptions::default();
// TODO all options opts.passive = config.passive;
opts.durable = config.durable; opts.durable = config.durable;
opts.auto_delete = config.auto_delete;
opts.internal = config.internal;
opts.nowait = config.no_wait;
let kind = match config.exchange_type { let kind = match config.exchange_type {
ExchangeType::Topic => ExchangeKind::Topic, ExchangeType::Topic => ExchangeKind::Topic,
ExchangeType::Fanout => ExchangeKind::Fanout, ExchangeType::Fanout => ExchangeKind::Fanout,
@ -43,19 +47,25 @@ impl ChannelExt for CloseOnDrop<Channel> {
fn declare_queue(&mut self, config: QueueConfig) -> Result<(), Self::Error> { fn declare_queue(&mut self, config: QueueConfig) -> Result<(), Self::Error> {
let mut opts = QueueDeclareOptions::default(); let mut opts = QueueDeclareOptions::default();
// TODO all options opts.passive = config.passive;
opts.durable = config.durable; opts.durable = config.durable;
opts.exclusive = config.exclusive;
opts.auto_delete = config.auto_delete;
opts.nowait = config.no_wait;
task::block_on(self.queue_declare(&config.queue, opts, FieldTable::default()))?; task::block_on(self.queue_declare(&config.queue, opts, FieldTable::default()))?;
Ok(()) Ok(())
} }
fn bind_queue(&mut self, config: BindQueueConfig) -> Result<(), Self::Error> { fn bind_queue(&mut self, config: BindQueueConfig) -> Result<(), Self::Error> {
// TODO all options let mut opts = QueueBindOptions::default();
opts.nowait = config.no_wait;
task::block_on(self.queue_bind( task::block_on(self.queue_bind(
&config.queue, &config.queue,
&config.exchange, &config.exchange,
&config.routing_key.unwrap_or_else(|| "".into()), &config.routing_key.unwrap_or_else(|| "".into()),
QueueBindOptions::default(), opts,
FieldTable::default(), FieldTable::default(),
))?; ))?;
Ok(()) Ok(())