diff --git a/ofborg/src/bin/builder.rs b/ofborg/src/bin/builder.rs index d8cd613..f608ce7 100644 --- a/ofborg/src/bin/builder.rs +++ b/ofborg/src/bin/builder.rs @@ -3,12 +3,10 @@ extern crate amqp; use std::collections::LinkedList; use std::env; -use amqp::{Consumer, Channel}; use amqp::protocol::basic::{Deliver,BasicProperties}; use std::path::Path; use amqp::Basic; -use amqp::protocol; use amqp::Session; use amqp::Table; use std::process; @@ -80,7 +78,7 @@ impl BuildWorker { impl worker::SimpleWorker for BuildWorker { type J = buildjob::BuildJob; - fn msg_to_job(&self, method: &Deliver, headers: &BasicProperties, + fn msg_to_job(&self, _: &Deliver, _: &BasicProperties, body: &Vec) -> Result { println!("lmao I got a job?"); return match buildjob::from(body) { @@ -92,10 +90,10 @@ impl worker::SimpleWorker for BuildWorker { } } - fn consumer(&self, job: &buildjob::BuildJob) -> Result<(), Error> { + fn consumer(&self, job: &buildjob::BuildJob) -> worker::Actions { let project = self.cloner.project(job.repo.full_name.clone(), job.repo.clone_url.clone()); let co = project.clone_for("builder".to_string(), - job.pr.number.to_string())?; + job.pr.number.to_string()).unwrap(); let target_branch = match job.pr.target_branch.clone() { Some(x) => { x } @@ -146,15 +144,11 @@ impl worker::SimpleWorker for BuildWorker { let last10lines = l10.into_iter().collect::>(); - /* - resp.build_finished( - &job, - &mut channel, - success, - last10lines.clone() - ); - */ - return Ok(()) + return self.actions().build_finished( + &job, + success, + last10lines.clone() + ); } } diff --git a/ofborg/src/message/buildjob.rs b/ofborg/src/message/buildjob.rs index 1dc1497..2643c8c 100644 --- a/ofborg/src/message/buildjob.rs +++ b/ofborg/src/message/buildjob.rs @@ -1,7 +1,8 @@ use ofborg::message::{Pr,Repo}; use ofborg::message::buildresult; +use ofborg::worker; use serde_json; -use amqp::{Basic, Channel, protocol}; +use amqp::{Channel, protocol}; #[derive(Serialize, Deserialize, Debug)] pub struct BuildJob { @@ -19,7 +20,7 @@ pub struct Actions { } impl Actions { - pub fn build_finished(&mut self, job: &BuildJob, channel: &mut Channel, success: bool, lines: Vec) { + pub fn build_finished(&mut self, job: &BuildJob, success: bool, lines: Vec) -> worker::Actions { let msg = buildresult::BuildResult { repo: job.repo.clone(), pr: job.pr.clone(), @@ -34,8 +35,16 @@ impl Actions { }; - - channel.basic_publish("build-results", "", true, true, - props, serde_json::to_string(&msg).unwrap().into_bytes()).unwrap(); + return vec![ + worker::Action::Publish(worker::QueueMsg{ + exchange: Some("build-results".to_owned()), + routing_key: None, + mandatory: true, + immediate: true, + properties: Some(props), + content: serde_json::to_string(&msg).unwrap().into_bytes() + }), + worker::Action::Ack + ]; } } diff --git a/ofborg/src/worker.rs b/ofborg/src/worker.rs index 1a7438f..0ade66e 100644 --- a/ofborg/src/worker.rs +++ b/ofborg/src/worker.rs @@ -2,7 +2,7 @@ use amqp::{Consumer, Channel}; use amqp::protocol::basic::{Deliver,BasicProperties}; use std::marker::Send; -use std::io::Error; + pub struct Worker { internal: T @@ -11,14 +11,28 @@ pub struct Worker { pub struct Response { } -enum Action { +pub type Actions = Vec; +pub enum Action { + Ack, + Nack, + Publish(QueueMsg), } +pub struct QueueMsg { + pub exchange: Option, + pub routing_key: Option, + pub mandatory: bool, + pub immediate: bool, + pub properties: Option, + pub content: Vec, +} + + pub trait SimpleWorker { type J; - fn consumer(&self, job: &Self::J) -> Result<(), Error>; + fn consumer(&self, job: &Self::J) -> Actions; fn msg_to_job(&self, method: &Deliver, headers: &BasicProperties, body: &Vec) -> Result; @@ -34,12 +48,18 @@ pub fn new(worker: T) -> Worker { impl Consumer for Worker { fn handle_delivery(&mut self, - channel: &mut Channel, + _: &mut Channel, method: Deliver, headers: BasicProperties, body: Vec) { let job = self.internal.msg_to_job(&method, &headers, &body).unwrap(); - self.internal.consumer(&job).unwrap(); + for action in self.internal.consumer(&job) { + match action { + Action::Ack => {} + Action::Nack => {} + Action::Publish(_) => {} + } + } } }