Merge pull request #477 from LnL7/lapin-qos
limit prefetch count to one for lapin channels
This commit is contained in:
commit
c3c0ee723e
|
@ -14,7 +14,7 @@ use async_std::stream::StreamExt;
|
||||||
use async_std::task;
|
use async_std::task;
|
||||||
use lapin::message::Delivery;
|
use lapin::message::Delivery;
|
||||||
use lapin::options::{
|
use lapin::options::{
|
||||||
BasicAckOptions, BasicConsumeOptions, BasicNackOptions, BasicPublishOptions,
|
BasicAckOptions, BasicConsumeOptions, BasicNackOptions, BasicPublishOptions, BasicQosOptions,
|
||||||
ExchangeDeclareOptions, QueueBindOptions, QueueDeclareOptions,
|
ExchangeDeclareOptions, QueueBindOptions, QueueDeclareOptions,
|
||||||
};
|
};
|
||||||
use lapin::types::{AMQPValue, FieldTable};
|
use lapin::types::{AMQPValue, FieldTable};
|
||||||
|
@ -85,6 +85,8 @@ impl<W: SimpleWorker + 'static> ConsumerExt<W> for CloseOnDrop<Channel> {
|
||||||
type Handle = Pin<Box<dyn Future<Output = ()> + 'static>>;
|
type Handle = Pin<Box<dyn Future<Output = ()> + 'static>>;
|
||||||
|
|
||||||
fn consume(self, mut worker: W, config: ConsumeConfig) -> Result<Self::Handle, Self::Error> {
|
fn consume(self, mut worker: W, config: ConsumeConfig) -> Result<Self::Handle, Self::Error> {
|
||||||
|
task::block_on(self.basic_qos(1, BasicQosOptions::default()))?;
|
||||||
|
|
||||||
let mut consumer = task::block_on(self.basic_consume(
|
let mut consumer = task::block_on(self.basic_consume(
|
||||||
&config.queue,
|
&config.queue,
|
||||||
&config.consumer_tag,
|
&config.consumer_tag,
|
||||||
|
@ -133,6 +135,8 @@ impl<W: SimpleNotifyWorker + 'static> ConsumerExt<W> for NotifyChannel {
|
||||||
type Handle = Pin<Box<dyn Future<Output = ()> + 'static>>;
|
type Handle = Pin<Box<dyn Future<Output = ()> + 'static>>;
|
||||||
|
|
||||||
fn consume(self, worker: W, config: ConsumeConfig) -> Result<Self::Handle, Self::Error> {
|
fn consume(self, worker: W, config: ConsumeConfig) -> Result<Self::Handle, Self::Error> {
|
||||||
|
task::block_on(self.0.basic_qos(1, BasicQosOptions::default()))?;
|
||||||
|
|
||||||
let mut consumer = task::block_on(self.0.basic_consume(
|
let mut consumer = task::block_on(self.0.basic_consume(
|
||||||
&config.queue,
|
&config.queue,
|
||||||
&config.consumer_tag,
|
&config.consumer_tag,
|
||||||
|
|
Loading…
Reference in a new issue