From fc41460034e6a1c121093bc75c64d1f05a185053 Mon Sep 17 00:00:00 2001 From: Daiderd Jordan Date: Fri, 29 May 2020 18:13:53 +0200 Subject: [PATCH] update lapin consumer The Stream implementation for consumers was changed to include the channel. Should be the last api change since it's released now. impl Stream for Consumer { type Item = Result<(Channel, Delivery)>; } --- ofborg/src/easylapin.rs | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/ofborg/src/easylapin.rs b/ofborg/src/easylapin.rs index c74ce6a..0a43af3 100644 --- a/ofborg/src/easylapin.rs +++ b/ofborg/src/easylapin.rs @@ -91,7 +91,8 @@ impl<'a, W: SimpleWorker + 'a> ConsumerExt<'a, W> for Channel { FieldTable::default(), ))?; Ok(Box::pin(async move { - while let Some(Ok(deliver)) = consumer.next().await { + while let Some(Ok(item)) = consumer.next().await { + let (_channel, deliver) = item; debug!(?deliver.delivery_tag, "consumed delivery"); let content_type = deliver.properties.content_type(); let job = worker @@ -164,7 +165,8 @@ impl<'a, W: SimpleNotifyWorker + 'a> ConsumerExt<'a, W> for NotifyChannel { ))?; let mut chan = self.0; Ok(Box::pin(async move { - while let Some(Ok(deliver)) = consumer.next().await { + while let Some(Ok(item)) = consumer.next().await { + let (_channel, deliver) = item; debug!(?deliver.delivery_tag, "consumed delivery"); let mut receiver = ChannelNotificationReceiver { channel: &mut chan,