forked from the-distro/ofborg
remove unwraps in lapin
Might help a little bit with debugging if any of these unexpected cases occur.
This commit is contained in:
parent
8cb5bfd225
commit
fb8b4249f1
|
@ -86,14 +86,18 @@ impl<W: SimpleWorker + 'static> ConsumerExt<W> for CloseOnDrop<Channel> {
|
||||||
Ok(Box::pin(async move {
|
Ok(Box::pin(async move {
|
||||||
while let Some(Ok(deliver)) = consumer.next().await {
|
while let Some(Ok(deliver)) = consumer.next().await {
|
||||||
let content_type = deliver.properties.content_type();
|
let content_type = deliver.properties.content_type();
|
||||||
let job = worker.msg_to_job(
|
let job = worker
|
||||||
|
.msg_to_job(
|
||||||
deliver.routing_key.as_str(),
|
deliver.routing_key.as_str(),
|
||||||
&content_type.as_ref().map(|s| s.to_string()),
|
&content_type.as_ref().map(|s| s.to_string()),
|
||||||
&deliver.data,
|
&deliver.data,
|
||||||
);
|
)
|
||||||
|
.expect("worker unexpected message consumed");
|
||||||
|
|
||||||
for action in worker.consumer(&job.unwrap()) {
|
for action in worker.consumer(&job) {
|
||||||
action_deliver(&self, &deliver, action).await.unwrap();
|
action_deliver(&self, &deliver, action)
|
||||||
|
.await
|
||||||
|
.expect("action deliver failure");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}))
|
}))
|
||||||
|
@ -107,7 +111,8 @@ struct ChannelNotificationReceiver<'a> {
|
||||||
|
|
||||||
impl<'a> NotificationReceiver for ChannelNotificationReceiver<'a> {
|
impl<'a> NotificationReceiver for ChannelNotificationReceiver<'a> {
|
||||||
fn tell(&mut self, action: Action) {
|
fn tell(&mut self, action: Action) {
|
||||||
task::block_on(action_deliver(self.channel, self.deliver, action)).unwrap();
|
task::block_on(action_deliver(self.channel, self.deliver, action))
|
||||||
|
.expect("action deliver failure");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -136,13 +141,15 @@ impl<W: SimpleNotifyWorker + 'static> ConsumerExt<W> for NotifyChannel {
|
||||||
};
|
};
|
||||||
|
|
||||||
let content_type = deliver.properties.content_type();
|
let content_type = deliver.properties.content_type();
|
||||||
let job = worker.msg_to_job(
|
let job = worker
|
||||||
|
.msg_to_job(
|
||||||
deliver.routing_key.as_str(),
|
deliver.routing_key.as_str(),
|
||||||
&content_type.as_ref().map(|s| s.to_string()),
|
&content_type.as_ref().map(|s| s.to_string()),
|
||||||
&deliver.data,
|
&deliver.data,
|
||||||
);
|
)
|
||||||
|
.expect("worker unexpected message consumed");
|
||||||
|
|
||||||
worker.consumer(&job.unwrap(), &mut receiver);
|
worker.consumer(&job, &mut receiver);
|
||||||
}
|
}
|
||||||
}))
|
}))
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in a new issue