update lapin consumer

The Stream implementation for consumers was changed to include the
channel.  Should be the last api change since it's released now.

    impl Stream for Consumer {
        type Item = Result<(Channel, Delivery)>;
    }
This commit is contained in:
Daiderd Jordan 2020-05-29 18:13:53 +02:00 committed by Cole Helbling
parent 350e2855b2
commit fc41460034

View file

@ -91,7 +91,8 @@ impl<'a, W: SimpleWorker + 'a> ConsumerExt<'a, W> for Channel {
FieldTable::default(), FieldTable::default(),
))?; ))?;
Ok(Box::pin(async move { Ok(Box::pin(async move {
while let Some(Ok(deliver)) = consumer.next().await { while let Some(Ok(item)) = consumer.next().await {
let (_channel, deliver) = item;
debug!(?deliver.delivery_tag, "consumed delivery"); debug!(?deliver.delivery_tag, "consumed delivery");
let content_type = deliver.properties.content_type(); let content_type = deliver.properties.content_type();
let job = worker let job = worker
@ -164,7 +165,8 @@ impl<'a, W: SimpleNotifyWorker + 'a> ConsumerExt<'a, W> for NotifyChannel {
))?; ))?;
let mut chan = self.0; let mut chan = self.0;
Ok(Box::pin(async move { Ok(Box::pin(async move {
while let Some(Ok(deliver)) = consumer.next().await { while let Some(Ok(item)) = consumer.next().await {
let (_channel, deliver) = item;
debug!(?deliver.delivery_tag, "consumed delivery"); debug!(?deliver.delivery_tag, "consumed delivery");
let mut receiver = ChannelNotificationReceiver { let mut receiver = ChannelNotificationReceiver {
channel: &mut chan, channel: &mut chan,