limit prefetch count to one for lapin channels

Otherwise the consumer will pull all messages in the queue so other
instances don't have a chance to share the load.
This commit is contained in:
Daiderd Jordan 2020-04-30 19:19:28 +02:00
parent 7d1014f1c2
commit e0fce0e2fe
No known key found for this signature in database
GPG key ID: D02435D05B810C96

View file

@ -14,7 +14,7 @@ use async_std::stream::StreamExt;
use async_std::task; use async_std::task;
use lapin::message::Delivery; use lapin::message::Delivery;
use lapin::options::{ use lapin::options::{
BasicAckOptions, BasicConsumeOptions, BasicNackOptions, BasicPublishOptions, BasicAckOptions, BasicConsumeOptions, BasicNackOptions, BasicPublishOptions, BasicQosOptions,
ExchangeDeclareOptions, QueueBindOptions, QueueDeclareOptions, ExchangeDeclareOptions, QueueBindOptions, QueueDeclareOptions,
}; };
use lapin::types::{AMQPValue, FieldTable}; use lapin::types::{AMQPValue, FieldTable};
@ -85,6 +85,8 @@ impl<W: SimpleWorker + 'static> ConsumerExt<W> for CloseOnDrop<Channel> {
type Handle = Pin<Box<dyn Future<Output = ()> + 'static>>; type Handle = Pin<Box<dyn Future<Output = ()> + 'static>>;
fn consume(self, mut worker: W, config: ConsumeConfig) -> Result<Self::Handle, Self::Error> { fn consume(self, mut worker: W, config: ConsumeConfig) -> Result<Self::Handle, Self::Error> {
task::block_on(self.basic_qos(1, BasicQosOptions::default()))?;
let mut consumer = task::block_on(self.basic_consume( let mut consumer = task::block_on(self.basic_consume(
&config.queue, &config.queue,
&config.consumer_tag, &config.consumer_tag,
@ -133,6 +135,8 @@ impl<W: SimpleNotifyWorker + 'static> ConsumerExt<W> for NotifyChannel {
type Handle = Pin<Box<dyn Future<Output = ()> + 'static>>; type Handle = Pin<Box<dyn Future<Output = ()> + 'static>>;
fn consume(self, worker: W, config: ConsumeConfig) -> Result<Self::Handle, Self::Error> { fn consume(self, worker: W, config: ConsumeConfig) -> Result<Self::Handle, Self::Error> {
task::block_on(self.0.basic_qos(1, BasicQosOptions::default()))?;
let mut consumer = task::block_on(self.0.basic_consume( let mut consumer = task::block_on(self.0.basic_consume(
&config.queue, &config.queue,
&config.consumer_tag, &config.consumer_tag,