diff --git a/ofborg/src/bin/builder.rs b/ofborg/src/bin/builder.rs index ddf89b5..5acd17d 100644 --- a/ofborg/src/bin/builder.rs +++ b/ofborg/src/bin/builder.rs @@ -81,7 +81,7 @@ fn main() { }) .unwrap(); - channel + let mut channel = channel .consume( notifyworker::new(tasks::build::BuildWorker::new( cloner, diff --git a/ofborg/src/bin/evaluation-filter.rs b/ofborg/src/bin/evaluation-filter.rs index e21f885..96e65ce 100644 --- a/ofborg/src/bin/evaluation-filter.rs +++ b/ofborg/src/bin/evaluation-filter.rs @@ -62,7 +62,7 @@ fn main() { .unwrap(); channel.basic_prefetch(1).unwrap(); - channel + let mut channel = channel .consume( worker::new(tasks::evaluationfilter::EvaluationFilterWorker::new( cfg.acl(), diff --git a/ofborg/src/bin/github-comment-filter.rs b/ofborg/src/bin/github-comment-filter.rs index 3ee0163..c872756 100644 --- a/ofborg/src/bin/github-comment-filter.rs +++ b/ofborg/src/bin/github-comment-filter.rs @@ -62,7 +62,7 @@ fn main() { .unwrap(); channel.basic_prefetch(1).unwrap(); - channel + let mut channel = channel .consume( worker::new(tasks::githubcommentfilter::GitHubCommentWorker::new( cfg.acl(), diff --git a/ofborg/src/bin/github-comment-poster.rs b/ofborg/src/bin/github-comment-poster.rs index 7a723b1..cca3925 100644 --- a/ofborg/src/bin/github-comment-poster.rs +++ b/ofborg/src/bin/github-comment-poster.rs @@ -47,7 +47,7 @@ fn main() { .unwrap(); channel.basic_prefetch(1).unwrap(); - channel + let mut channel = channel .consume( worker::new(tasks::githubcommentposter::GitHubCommentPoster::new( cfg.github_app_vendingmachine(), diff --git a/ofborg/src/bin/log-message-collector.rs b/ofborg/src/bin/log-message-collector.rs index f7513b9..b49a809 100644 --- a/ofborg/src/bin/log-message-collector.rs +++ b/ofborg/src/bin/log-message-collector.rs @@ -49,7 +49,7 @@ fn main() { }) .unwrap(); - channel + let mut channel = channel .consume( worker::new(tasks::log_message_collector::LogMessageCollector::new( PathBuf::from(cfg.log_storage.clone().unwrap().path), diff --git a/ofborg/src/bin/mass-rebuilder.rs b/ofborg/src/bin/mass-rebuilder.rs index eb310db..353afe6 100644 --- a/ofborg/src/bin/mass-rebuilder.rs +++ b/ofborg/src/bin/mass-rebuilder.rs @@ -65,7 +65,7 @@ fn main() { .unwrap(); channel.basic_prefetch(1).unwrap(); - channel + let mut channel = channel .consume( worker::new(mrw), easyamqp::ConsumeConfig { diff --git a/ofborg/src/bin/stats.rs b/ofborg/src/bin/stats.rs index 3f147ad..a835cbf 100644 --- a/ofborg/src/bin/stats.rs +++ b/ofborg/src/bin/stats.rs @@ -59,7 +59,7 @@ fn main() { .unwrap(); channel.basic_prefetch(1).unwrap(); - channel + let mut channel = channel .consume( worker::new(collector), easyamqp::ConsumeConfig { diff --git a/ofborg/src/easyamqp.rs b/ofborg/src/easyamqp.rs index d80eeec..3104e24 100644 --- a/ofborg/src/easyamqp.rs +++ b/ofborg/src/easyamqp.rs @@ -303,9 +303,10 @@ pub trait ChannelExt { fn bind_queue(&mut self, config: BindQueueConfig) -> Result<(), Self::Error>; } -pub trait ConsumerExt { +pub trait ConsumerExt { type Error; - fn consume(&mut self, callback: T, config: ConsumeConfig) -> Result<(), Self::Error>; + type Handle; + fn consume(self, callback: C, config: ConsumeConfig) -> Result; } impl ChannelExt for amqp::Channel { @@ -350,10 +351,11 @@ impl ChannelExt for amqp::Channel { } } -impl ConsumerExt for amqp::Channel { +impl ConsumerExt for amqp::Channel { type Error = amqp::AMQPError; + type Handle = Self; - fn consume(&mut self, callback: T, config: ConsumeConfig) -> Result<(), Self::Error> { + fn consume(mut self, callback: C, config: ConsumeConfig) -> Result { self.basic_consume( callback, config.queue, @@ -364,6 +366,6 @@ impl ConsumerExt for amqp::Channel { config.no_wait, amqp::Table::new(), )?; - Ok(()) + Ok(self) } } diff --git a/ofborg/src/tasks/evaluate.rs b/ofborg/src/tasks/evaluate.rs index 4fbf22f..751e48c 100644 --- a/ofborg/src/tasks/evaluate.rs +++ b/ofborg/src/tasks/evaluate.rs @@ -11,7 +11,6 @@ use crate::systems; use crate::tasks::eval; use crate::worker; -use amqp::protocol::basic::{BasicProperties, Deliver}; use hubcaps::checks::CheckRunOptions; use hubcaps::gists::Gists; use hubcaps::issues::Issue; @@ -60,12 +59,7 @@ impl EvaluationWorker { impl worker::SimpleWorker for EvaluationWorker { type J = evaluationjob::EvaluationJob; - fn msg_to_job( - &mut self, - _: &Deliver, - _: &BasicProperties, - body: &[u8], - ) -> Result { + fn msg_to_job(&mut self, _: &str, _: &Option, body: &[u8]) -> Result { self.events.notify(Event::JobReceived); match evaluationjob::from(body) { Ok(e) => { diff --git a/ofborg/src/tasks/evaluationfilter.rs b/ofborg/src/tasks/evaluationfilter.rs index 886e65a..717b414 100644 --- a/ofborg/src/tasks/evaluationfilter.rs +++ b/ofborg/src/tasks/evaluationfilter.rs @@ -3,8 +3,6 @@ use crate::ghevent; use crate::message::{evaluationjob, Pr, Repo}; use crate::worker; -use amqp::protocol::basic::{BasicProperties, Deliver}; - pub struct EvaluationFilterWorker { acl: acl::ACL, } @@ -20,8 +18,8 @@ impl worker::SimpleWorker for EvaluationFilterWorker { fn msg_to_job( &mut self, - _: &Deliver, - _: &BasicProperties, + _: &str, + _: &Option, body: &[u8], ) -> Result { match serde_json::from_slice(body) { diff --git a/ofborg/src/tasks/githubcommentfilter.rs b/ofborg/src/tasks/githubcommentfilter.rs index f0e1412..239fca7 100644 --- a/ofborg/src/tasks/githubcommentfilter.rs +++ b/ofborg/src/tasks/githubcommentfilter.rs @@ -4,7 +4,6 @@ use crate::ghevent; use crate::message::{buildjob, evaluationjob, Pr, Repo}; use crate::worker; -use amqp::protocol::basic::{BasicProperties, Deliver}; use uuid::Uuid; pub struct GitHubCommentWorker { @@ -21,12 +20,7 @@ impl GitHubCommentWorker { impl worker::SimpleWorker for GitHubCommentWorker { type J = ghevent::IssueComment; - fn msg_to_job( - &mut self, - _: &Deliver, - _: &BasicProperties, - body: &[u8], - ) -> Result { + fn msg_to_job(&mut self, _: &str, _: &Option, body: &[u8]) -> Result { match serde_json::from_slice(body) { Ok(e) => Ok(e), Err(e) => { diff --git a/ofborg/src/tasks/githubcommentposter.rs b/ofborg/src/tasks/githubcommentposter.rs index 20eee6e..82fea77 100644 --- a/ofborg/src/tasks/githubcommentposter.rs +++ b/ofborg/src/tasks/githubcommentposter.rs @@ -4,7 +4,6 @@ use crate::message::buildresult::{BuildResult, BuildStatus, LegacyBuildResult}; use crate::message::Repo; use crate::worker; -use amqp::protocol::basic::{BasicProperties, Deliver}; use chrono::{DateTime, Utc}; use hubcaps::checks::{CheckRunOptions, CheckRunState, Conclusion, Output}; @@ -42,12 +41,7 @@ impl PostableEvent { impl worker::SimpleWorker for GitHubCommentPoster { type J = PostableEvent; - fn msg_to_job( - &mut self, - _: &Deliver, - _: &BasicProperties, - body: &[u8], - ) -> Result { + fn msg_to_job(&mut self, _: &str, _: &Option, body: &[u8]) -> Result { PostableEvent::from(body) } diff --git a/ofborg/src/tasks/log_message_collector.rs b/ofborg/src/tasks/log_message_collector.rs index fe67426..38a0bd1 100644 --- a/ofborg/src/tasks/log_message_collector.rs +++ b/ofborg/src/tasks/log_message_collector.rs @@ -3,7 +3,6 @@ use crate::message::buildresult::BuildResult; use crate::worker; use crate::writetoline::LineWriter; -use amqp::protocol::basic::{BasicProperties, Deliver}; use lru_cache::LruCache; use std::fs::{self, File, OpenOptions}; @@ -174,8 +173,8 @@ impl worker::SimpleWorker for LogMessageCollector { fn msg_to_job( &mut self, - deliver: &Deliver, - _: &BasicProperties, + routing_key: &str, + _: &Option, body: &[u8], ) -> Result { let message: MsgType; @@ -203,7 +202,7 @@ impl worker::SimpleWorker for LogMessageCollector { Ok(LogMessage { from: LogFrom { - routing_key: deliver.routing_key.clone(), + routing_key: routing_key.to_string(), attempt_id, }, message, diff --git a/ofborg/src/tasks/statscollector.rs b/ofborg/src/tasks/statscollector.rs index a28fa11..84e733e 100644 --- a/ofborg/src/tasks/statscollector.rs +++ b/ofborg/src/tasks/statscollector.rs @@ -1,8 +1,6 @@ use crate::stats; use crate::worker; -use amqp::protocol::basic::{BasicProperties, Deliver}; - pub struct StatCollectorWorker { events: E, collector: stats::MetricCollector, @@ -17,12 +15,7 @@ impl StatCollectorWorker { impl worker::SimpleWorker for StatCollectorWorker { type J = stats::EventMessage; - fn msg_to_job( - &mut self, - _: &Deliver, - _: &BasicProperties, - body: &[u8], - ) -> Result { + fn msg_to_job(&mut self, _: &str, _: &Option, body: &[u8]) -> Result { match serde_json::from_slice(body) { Ok(e) => Ok(e), Err(_) => { diff --git a/ofborg/src/worker.rs b/ofborg/src/worker.rs index afa4069..51c83c8 100644 --- a/ofborg/src/worker.rs +++ b/ofborg/src/worker.rs @@ -1,5 +1,5 @@ use amqp::protocol::basic::{BasicProperties, Deliver}; -use amqp::{Basic, Channel, Consumer}; +use amqp::Basic; use serde::Serialize; use std::marker::Send; @@ -53,15 +53,15 @@ where })) } -pub trait SimpleWorker: Send + 'static { +pub trait SimpleWorker: Send { type J: Send; fn consumer(&mut self, job: &Self::J) -> Actions; fn msg_to_job( &mut self, - method: &Deliver, - headers: &BasicProperties, + method: &str, + headers: &Option, body: &[u8], ) -> Result; } @@ -70,15 +70,17 @@ pub fn new(worker: T) -> Worker { Worker { internal: worker } } -impl Consumer for Worker { +impl amqp::Consumer for Worker { fn handle_delivery( &mut self, - channel: &mut Channel, + channel: &mut amqp::Channel, method: Deliver, headers: BasicProperties, body: Vec, ) { - let job = self.internal.msg_to_job(&method, &headers, &body); + let job = self + .internal + .msg_to_job(&method.routing_key, &headers.content_type, &body); if let Err(e) = job { error!("Error decoding job: {:?}", e);