diff --git a/ofborg/src/bin/evaluation-filter.rs b/ofborg/src/bin/evaluation-filter.rs index 76bf5b0..1c5c032 100644 --- a/ofborg/src/bin/evaluation-filter.rs +++ b/ofborg/src/bin/evaluation-filter.rs @@ -56,7 +56,7 @@ fn main() -> Result<(), Box> { no_wait: false, })?; - let handle = chan.consume( + let handle = easylapin::WorkerChannel(chan).consume( tasks::evaluationfilter::EvaluationFilterWorker::new(cfg.acl()), easyamqp::ConsumeConfig { queue: queue_name.clone(), diff --git a/ofborg/src/bin/github-comment-filter.rs b/ofborg/src/bin/github-comment-filter.rs index 31a90b4..acee59b 100644 --- a/ofborg/src/bin/github-comment-filter.rs +++ b/ofborg/src/bin/github-comment-filter.rs @@ -57,7 +57,7 @@ fn main() -> Result<(), Box> { no_wait: false, })?; - let handle = chan.consume( + let handle = easylapin::WorkerChannel(chan).consume( tasks::githubcommentfilter::GitHubCommentWorker::new(cfg.acl(), cfg.github()), easyamqp::ConsumeConfig { queue: "build-inputs".to_owned(), diff --git a/ofborg/src/bin/github-comment-poster.rs b/ofborg/src/bin/github-comment-poster.rs index 4e623b8..0521db2 100644 --- a/ofborg/src/bin/github-comment-poster.rs +++ b/ofborg/src/bin/github-comment-poster.rs @@ -46,7 +46,7 @@ fn main() -> Result<(), Box> { no_wait: false, })?; - let handle = chan.consume( + let handle = easylapin::WorkerChannel(chan).consume( tasks::githubcommentposter::GitHubCommentPoster::new(cfg.github_app_vendingmachine()), easyamqp::ConsumeConfig { queue: "build-results".to_owned(), diff --git a/ofborg/src/bin/log-message-collector.rs b/ofborg/src/bin/log-message-collector.rs index 175eeb0..c6d914a 100644 --- a/ofborg/src/bin/log-message-collector.rs +++ b/ofborg/src/bin/log-message-collector.rs @@ -48,6 +48,7 @@ fn main() -> Result<(), Box> { no_wait: false, })?; + // Regular channel, we want prefetching here. let handle = chan.consume( tasks::log_message_collector::LogMessageCollector::new( PathBuf::from(cfg.log_storage.clone().unwrap().path), diff --git a/ofborg/src/bin/mass-rebuilder.rs b/ofborg/src/bin/mass-rebuilder.rs index d72f171..d43cca6 100644 --- a/ofborg/src/bin/mass-rebuilder.rs +++ b/ofborg/src/bin/mass-rebuilder.rs @@ -53,7 +53,7 @@ fn main() -> Result<(), Box> { no_wait: false, })?; - let handle = chan.consume( + let handle = easylapin::WorkerChannel(chan).consume( tasks::evaluate::EvaluationWorker::new( cloner, &nix, diff --git a/ofborg/src/easylapin.rs b/ofborg/src/easylapin.rs index d6fa113..1820529 100644 --- a/ofborg/src/easylapin.rs +++ b/ofborg/src/easylapin.rs @@ -86,8 +86,6 @@ impl<'a, W: SimpleWorker + 'a> ConsumerExt<'a, W> for CloseOnDrop { type Handle = Pin + 'a>>; fn consume(self, mut worker: W, config: ConsumeConfig) -> Result { - task::block_on(self.basic_qos(1, BasicQosOptions::default()))?; - let mut consumer = task::block_on(self.basic_consume( &config.queue, &config.consumer_tag, @@ -117,6 +115,20 @@ impl<'a, W: SimpleWorker + 'a> ConsumerExt<'a, W> for CloseOnDrop { } } +// Same as a regular channel, but without prefetching, +// used for services with multiple instances. +pub struct WorkerChannel(pub CloseOnDrop); + +impl<'a, W: SimpleWorker + 'a> ConsumerExt<'a, W> for WorkerChannel { + type Error = lapin::Error; + type Handle = Pin + 'a>>; + + fn consume(self, worker: W, config: ConsumeConfig) -> Result { + task::block_on(self.0.basic_qos(1, BasicQosOptions::default()))?; + self.0.consume(worker, config) + } +} + struct ChannelNotificationReceiver<'a> { channel: &'a mut CloseOnDrop, deliver: &'a Delivery,