diff --git a/ofborg/src/easylapin.rs b/ofborg/src/easylapin.rs index f435ffa..c869820 100644 --- a/ofborg/src/easylapin.rs +++ b/ofborg/src/easylapin.rs @@ -86,14 +86,18 @@ impl ConsumerExt for CloseOnDrop { Ok(Box::pin(async move { while let Some(Ok(deliver)) = consumer.next().await { let content_type = deliver.properties.content_type(); - let job = worker.msg_to_job( - deliver.routing_key.as_str(), - &content_type.as_ref().map(|s| s.to_string()), - &deliver.data, - ); + let job = worker + .msg_to_job( + deliver.routing_key.as_str(), + &content_type.as_ref().map(|s| s.to_string()), + &deliver.data, + ) + .expect("worker unexpected message consumed"); - for action in worker.consumer(&job.unwrap()) { - action_deliver(&self, &deliver, action).await.unwrap(); + for action in worker.consumer(&job) { + action_deliver(&self, &deliver, action) + .await + .expect("action deliver failure"); } } })) @@ -107,7 +111,8 @@ struct ChannelNotificationReceiver<'a> { impl<'a> NotificationReceiver for ChannelNotificationReceiver<'a> { fn tell(&mut self, action: Action) { - task::block_on(action_deliver(self.channel, self.deliver, action)).unwrap(); + task::block_on(action_deliver(self.channel, self.deliver, action)) + .expect("action deliver failure"); } } @@ -136,13 +141,15 @@ impl ConsumerExt for NotifyChannel { }; let content_type = deliver.properties.content_type(); - let job = worker.msg_to_job( - deliver.routing_key.as_str(), - &content_type.as_ref().map(|s| s.to_string()), - &deliver.data, - ); + let job = worker + .msg_to_job( + deliver.routing_key.as_str(), + &content_type.as_ref().map(|s| s.to_string()), + &deliver.data, + ) + .expect("worker unexpected message consumed"); - worker.consumer(&job.unwrap(), &mut receiver); + worker.consumer(&job, &mut receiver); } })) }