diff --git a/ofborg/src/bin/builder.rs b/ofborg/src/bin/builder.rs index 8251eab..57424a4 100644 --- a/ofborg/src/bin/builder.rs +++ b/ofborg/src/bin/builder.rs @@ -14,7 +14,7 @@ use amqp::Table; use ofborg::config; use ofborg::checkout; -use ofborg::worker; +use ofborg::notifyworker; use ofborg::tasks; @@ -51,7 +51,7 @@ fn main() { channel.basic_prefetch(1).unwrap(); channel.basic_consume( - worker::new(tasks::build::BuildWorker::new( + notifyworker::new(tasks::build::BuildWorker::new( cloner, nix, cfg.nix.system.clone(), diff --git a/ofborg/src/lib.rs b/ofborg/src/lib.rs index 612d23a..a816f4a 100644 --- a/ofborg/src/lib.rs +++ b/ofborg/src/lib.rs @@ -42,6 +42,7 @@ pub mod ofborg { pub use locks; pub use clone; pub use worker; + pub use notifyworker; pub use message; pub use tasks; pub use evalchecker; diff --git a/ofborg/src/message/buildjob.rs b/ofborg/src/message/buildjob.rs index 1fad4d9..c5c8075 100644 --- a/ofborg/src/message/buildjob.rs +++ b/ofborg/src/message/buildjob.rs @@ -19,55 +19,3 @@ pub fn from(data: &Vec) -> Result { pub struct Actions { pub system: String, } - -impl Actions { - pub fn commit_missing(&mut self, _job: &BuildJob) -> worker::Actions { - return vec![ - worker::Action::Ack - ]; - } - - pub fn nasty_hack_linux_only(&mut self, _job: &BuildJob) -> worker::Actions { - return vec![ - worker::Action::Ack - ]; - } - - pub fn merge_failed(&mut self, job: &BuildJob) -> worker::Actions { - let msg = buildresult::BuildResult { - repo: job.repo.clone(), - pr: job.pr.clone(), - system: self.system.clone(), - output: vec![String::from("Merge failed")], - success: false - }; - - return vec![ - worker::publish_serde_action( - Some("build-results".to_owned()), - None, - &msg - ), - worker::Action::Ack - ]; - } - - pub fn build_finished(&mut self, job: &BuildJob, success: bool, lines: Vec) -> worker::Actions { - let msg = buildresult::BuildResult { - repo: job.repo.clone(), - pr: job.pr.clone(), - system: self.system.clone(), - output: lines, - success: success - }; - - return vec![ - worker::publish_serde_action( - Some("build-results".to_owned()), - None, - &msg - ), - worker::Action::Ack - ]; - } -} diff --git a/ofborg/src/tasks/build.rs b/ofborg/src/tasks/build.rs index 1ed6fec..349321e 100644 --- a/ofborg/src/tasks/build.rs +++ b/ofborg/src/tasks/build.rs @@ -9,10 +9,12 @@ use std::io::BufRead; use ofborg::checkout; use ofborg::message::buildjob; +use ofborg::message::buildresult; use ofborg::nix; use ofborg::commentparser; use ofborg::worker; +use ofborg::notifyworker; use amqp::protocol::basic::{Deliver,BasicProperties}; @@ -33,17 +35,78 @@ impl BuildWorker { }; } - fn actions(&self) -> buildjob::Actions { - return buildjob::Actions{ - system: self.system.clone(), - }; + fn actions<'a>(&self, receiver: &'a mut notifyworker::NotificationReceiver) -> JobActions<'a> { + JobActions::new(&self.system, &self.identity, receiver) } } -impl worker::SimpleWorker for BuildWorker { +struct JobActions<'a> { + system: String, + identity: String, + receiver: &'a mut notifyworker::NotificationReceiver +} + +impl<'a> JobActions<'a> { + fn new(system: &str, identity: &str, receiver: &'a mut notifyworker::NotificationReceiver) -> JobActions<'a> { + return JobActions { + system: system.to_owned(), + identity: system.to_owned(), + receiver: receiver, + }; + } + + pub fn commit_missing(&mut self, _job: &buildjob::BuildJob) { + self.tell(worker::Action::Ack); + } + + pub fn nasty_hack_linux_only(&mut self, _job: &buildjob::BuildJob) { + self.tell(worker::Action::Ack); + } + + pub fn merge_failed(&mut self, job: &buildjob::BuildJob) { + let msg = buildresult::BuildResult { + repo: job.repo.clone(), + pr: job.pr.clone(), + system: self.system.clone(), + output: vec![String::from("Merge failed")], + + success: false + }; + + self.tell(worker::publish_serde_action( + Some("build-results".to_owned()), + None, + &msg + )); + self.tell(worker::Action::Ack); + } + + pub fn build_finished(&mut self, job: &buildjob::BuildJob, success: bool, lines: Vec) { + let msg = buildresult::BuildResult { + repo: job.repo.clone(), + pr: job.pr.clone(), + system: self.system.clone(), + output: lines, + success: success + }; + + self.tell(worker::publish_serde_action( + Some("build-results".to_owned()), + None, + &msg + )); + self.tell(worker::Action::Ack); + } + + fn tell(&mut self, action: worker::Action) { + self.receiver.tell(action); + } +} + +impl notifyworker::SimpleNotifyWorker for BuildWorker { type J = buildjob::BuildJob; - fn msg_to_job(&mut self, _: &Deliver, _: &BasicProperties, + fn msg_to_job(&self, _: &Deliver, _: &BasicProperties, body: &Vec) -> Result { println!("lmao I got a job?"); return match buildjob::from(body) { @@ -55,7 +118,9 @@ impl worker::SimpleWorker for BuildWorker { } } - fn consumer(&mut self, job: &buildjob::BuildJob) -> worker::Actions { + fn consumer(&self, job: &buildjob::BuildJob, notifier: &mut notifyworker::NotificationReceiver) { + let mut actions = self.actions(notifier); + info!("Working on {}", job.pr.number); let project = self.cloner.project(job.repo.full_name.clone(), job.repo.clone_url.clone()); let co = project.clone_for("builder".to_string(), @@ -76,7 +141,8 @@ impl worker::SimpleWorker for BuildWorker { if buildfile == "./nixos/release.nix" && self.system != "x86_64-linux" { // NixOS jobs get routed to all builders, even though darwin // cannot build them. - return self.actions().nasty_hack_linux_only(&job); + actions.nasty_hack_linux_only(&job); + return; } let refpath = co.checkout_origin_ref(target_branch.as_ref()).unwrap(); @@ -84,12 +150,14 @@ impl worker::SimpleWorker for BuildWorker { if !co.commit_exists(job.pr.head_sha.as_ref()) { info!("Commit {} doesn't exist", job.pr.head_sha); - return self.actions().commit_missing(&job); + actions.commit_missing(&job); + return; } if let Err(_) = co.merge_commit(job.pr.head_sha.as_ref()) { info!("Failed to merge {}", job.pr.head_sha); - return self.actions().merge_failed(&job); + actions.merge_failed(&job); + return; } println!("Got path: {:?}, building", refpath); @@ -135,7 +203,7 @@ impl worker::SimpleWorker for BuildWorker { let last10lines = l10.into_iter().collect::>(); - return self.actions().build_finished( + actions.build_finished( &job, success, last10lines.clone()