diff --git a/ofborg/src/easylapin.rs b/ofborg/src/easylapin.rs index 3868335..3d40904 100644 --- a/ofborg/src/easylapin.rs +++ b/ofborg/src/easylapin.rs @@ -14,7 +14,7 @@ use async_std::stream::StreamExt; use async_std::task; use lapin::message::Delivery; use lapin::options::{ - BasicAckOptions, BasicConsumeOptions, BasicNackOptions, BasicPublishOptions, + BasicAckOptions, BasicConsumeOptions, BasicNackOptions, BasicPublishOptions, BasicQosOptions, ExchangeDeclareOptions, QueueBindOptions, QueueDeclareOptions, }; use lapin::types::{AMQPValue, FieldTable}; @@ -85,6 +85,8 @@ impl ConsumerExt for CloseOnDrop { type Handle = Pin + 'static>>; fn consume(self, mut worker: W, config: ConsumeConfig) -> Result { + task::block_on(self.basic_qos(1, BasicQosOptions::default()))?; + let mut consumer = task::block_on(self.basic_consume( &config.queue, &config.consumer_tag, @@ -133,6 +135,8 @@ impl ConsumerExt for NotifyChannel { type Handle = Pin + 'static>>; fn consume(self, worker: W, config: ConsumeConfig) -> Result { + task::block_on(self.0.basic_qos(1, BasicQosOptions::default()))?; + let mut consumer = task::block_on(self.0.basic_consume( &config.queue, &config.consumer_tag,