From 9a8d79ed7066dbd7bb02966525decb2fb7afb131 Mon Sep 17 00:00:00 2001 From: Graham Christensen Date: Sat, 18 Nov 2017 13:46:52 -0500 Subject: [PATCH] initial cut for mass rebuilds --- ofborg/src/bin/mass-rebuilder.rs | 71 ++++++++ ofborg/src/message/massrebuildjob.rs | 25 +++ ofborg/src/message/mod.rs | 2 + ofborg/src/tasks/massrebuilder.rs | 248 +++++++++++++++++++++++++++ ofborg/src/tasks/mod.rs | 1 + 5 files changed, 347 insertions(+) create mode 100644 ofborg/src/bin/mass-rebuilder.rs create mode 100644 ofborg/src/message/massrebuildjob.rs create mode 100644 ofborg/src/tasks/massrebuilder.rs diff --git a/ofborg/src/bin/mass-rebuilder.rs b/ofborg/src/bin/mass-rebuilder.rs new file mode 100644 index 0000000..28ce42a --- /dev/null +++ b/ofborg/src/bin/mass-rebuilder.rs @@ -0,0 +1,71 @@ +extern crate ofborg; +extern crate amqp; +extern crate env_logger; + +#[macro_use] +extern crate log; + +use std::env; +use std::path::Path; + +use ofborg::tasks; +use ofborg::config; +use ofborg::checkout; +use ofborg::nix; + +use ofborg::worker; +use amqp::Session; +use amqp::Table; +use amqp::Basic; + +fn main() { + let cfg = config::load(env::args().nth(1).unwrap().as_ref()); + + + if let Err(_) = env::var("RUST_LOG") { + env::set_var("RUST_LOG", "info"); + env_logger::init().unwrap(); + info!("Defaulting RUST_LOG environment variable to info"); + } else { + env_logger::init().unwrap(); + } + + println!("Hello, world!"); + + + let mut session = Session::open_url(&cfg.rabbitmq.as_uri()).unwrap(); + println!("Connected to rabbitmq"); + { + println!("About to open channel #1"); + let hbchan = session.open_channel(1).unwrap(); + + println!("Opened channel #1"); + + tasks::heartbeat::start_on_channel(hbchan, cfg.whoami()); + } + + let mut channel = session.open_channel(2).unwrap(); + + let cloner = checkout::cached_cloner(Path::new(&cfg.checkout.root)); + let nix = nix::new(cfg.nix.system.clone(), cfg.nix.remote.clone()); + + channel.basic_consume( + worker::new(tasks::massrebuilder::MassRebuildWorker::new(cloner, nix)), + "mass-rebuild-check-jobs", + format!("{}-mass-rebuild-checker", cfg.whoami()).as_ref(), + false, + false, + false, + false, + Table::new() + ).unwrap(); + + channel.start_consuming(); + + println!("Finished consuming?"); + + channel.close(200, "Bye").unwrap(); + println!("Closed the channel"); + session.close(200, "Good Bye"); + println!("Closed the session... EOF"); +} diff --git a/ofborg/src/message/massrebuildjob.rs b/ofborg/src/message/massrebuildjob.rs new file mode 100644 index 0000000..37c5662 --- /dev/null +++ b/ofborg/src/message/massrebuildjob.rs @@ -0,0 +1,25 @@ +use ofborg::message::{Pr,Repo}; +use ofborg::worker; +use serde_json; + + +pub fn from(data: &Vec) -> Result { + return serde_json::from_slice(&data); +} + +#[derive(Serialize, Deserialize, Debug)] +pub struct MassRebuildJob { + pub repo: Repo, + pub pr: Pr, +} + +pub struct Actions { +} + +impl Actions { + pub fn skip(&mut self, _job: &MassRebuildJob) -> worker::Actions { + return vec![ + worker::Action::Ack + ]; + } +} diff --git a/ofborg/src/message/mod.rs b/ofborg/src/message/mod.rs index 0653a84..f9fd80d 100644 --- a/ofborg/src/message/mod.rs +++ b/ofborg/src/message/mod.rs @@ -1,6 +1,8 @@ mod common; pub mod buildjob; pub mod buildresult; +pub mod massrebuildjob; pub mod plasticheartbeat; + pub use self::common::{Pr,Repo}; diff --git a/ofborg/src/tasks/massrebuilder.rs b/ofborg/src/tasks/massrebuilder.rs new file mode 100644 index 0000000..4a447cd --- /dev/null +++ b/ofborg/src/tasks/massrebuilder.rs @@ -0,0 +1,248 @@ +extern crate amqp; +extern crate env_logger; + +use std::fs::File; +use std::io::Read; +use std::io::BufRead; +use std::io::BufReader; +use std::path::PathBuf; +use ofborg::checkout; +use ofborg::message::massrebuildjob; +use ofborg::nix; + +use ofborg::worker; +use amqp::protocol::basic::{Deliver,BasicProperties}; + +pub struct MassRebuildWorker { + cloner: checkout::CachedCloner, + nix: nix::Nix, +} + +impl MassRebuildWorker { + pub fn new(cloner: checkout::CachedCloner, nix: nix::Nix) -> MassRebuildWorker { + return MassRebuildWorker{ + cloner: cloner, + nix: nix, + }; + } + + fn actions(&self) -> massrebuildjob::Actions { + return massrebuildjob::Actions{ + }; + } +} + +impl worker::SimpleWorker for MassRebuildWorker { + type J = massrebuildjob::MassRebuildJob; + + fn msg_to_job(&self, _: &Deliver, _: &BasicProperties, + body: &Vec) -> Result { + return match massrebuildjob::from(body) { + Ok(e) => { Ok(e) } + Err(e) => { + println!("{:?}", String::from_utf8(body.clone())); + panic!("{:?}", e); + } + } + } + + fn consumer(&self, job: &massrebuildjob::MassRebuildJob) -> worker::Actions { + let project = self.cloner.project(job.repo.full_name.clone(), job.repo.clone_url.clone()); + let co = project.clone_for("mr-est".to_string(), + job.pr.number.to_string()).unwrap(); + + let target_branch = match job.pr.target_branch.clone() { + Some(x) => { x } + None => { String::from("origin/master") } + }; + + let refpath = co.checkout_ref(target_branch.as_ref()).unwrap(); + + + let mut stdenvs = Stdenvs::new(self.nix.clone(), PathBuf::from(&refpath)); + stdenvs.identify_before(); + + co.fetch_pr(job.pr.number).unwrap(); + + if !co.commit_exists(job.pr.head_sha.as_ref()) { + info!("Commit {} doesn't exist", job.pr.head_sha); + return self.actions().skip(&job); + } + + if let Err(_) = co.merge_commit(job.pr.head_sha.as_ref()) { + info!("Failed to merge {}", job.pr.head_sha); + return self.actions().skip(&job); + } + + stdenvs.identify_after(); + + println!("Got path: {:?}, building", refpath); + if !stdenvs.are_same() { + println!("Stdenvs changed? {:?}", stdenvs.changed()); + } + + return vec![]; + } +} + +enum StdenvFrom { + Before, + After +} + +#[derive(Debug)] +enum System { + X8664Darwin, + X8664Linux +} + +#[derive(Debug, PartialEq)] +struct Stdenvs { + nix: nix::Nix, + co: PathBuf, + + linux_stdenv_before: Option, + linux_stdenv_after: Option, + + darwin_stdenv_before: Option, + darwin_stdenv_after: Option, +} + +impl Stdenvs { + fn new(nix: nix::Nix, co: PathBuf) -> Stdenvs { + return Stdenvs { + nix: nix, + co: co, + + linux_stdenv_before: None, + linux_stdenv_after: None, + + darwin_stdenv_before: None, + darwin_stdenv_after: None, + } + } + + fn identify_before(&mut self) { + self.identify(System::X8664Linux, StdenvFrom::Before); + self.identify(System::X8664Darwin, StdenvFrom::Before); + } + + fn identify_after(&mut self) { + self.identify(System::X8664Linux, StdenvFrom::After); + self.identify(System::X8664Darwin, StdenvFrom::After); + } + + fn are_same(&self) -> bool { + return self.changed().len() == 0; + } + + fn changed(&self) -> Vec { + let mut changed: Vec = vec![]; + + if self.linux_stdenv_before != self.linux_stdenv_after { + changed.push(System::X8664Linux); + } + + if self.darwin_stdenv_before != self.darwin_stdenv_after { + changed.push(System::X8664Darwin); + } + + + return changed + } + + fn identify(&mut self, system: System, from: StdenvFrom) { + match (system, from) { + (System::X8664Linux, StdenvFrom::Before) => { + self.linux_stdenv_before = self.evalstdenv("x86_64-linux"); + } + (System::X8664Linux, StdenvFrom::After) => { + self.linux_stdenv_after = self.evalstdenv("x86_64-linux"); + } + + (System::X8664Darwin, StdenvFrom::Before) => { + self.darwin_stdenv_before = self.evalstdenv("x86_64-darwin"); + } + (System::X8664Darwin, StdenvFrom::After) => { + self.darwin_stdenv_after = self.evalstdenv("x86_64-darwin"); + } + } + } + + fn evalstdenv(&self, system: &str) -> Option { + let result = self.nix.with_system(system.to_owned()).safely( + "nix-instantiate", &self.co, vec![ + String::from("."), + String::from("-A"), + String::from("stdenv"), + ] + ); + + println!("{:?}", result); + + return match result { + Ok(mut out) => { + file_to_drv(&mut out) + } + Err(mut out) => { + println!("{:?}", file_to_str(&mut out)); + None + } + } + } +} + +fn file_to_drv(f: &mut File) -> Option { + let r = BufReader::new(f); + let matches: Vec; + matches = r.lines().filter_map(|x| + match x { + Ok(line) => { + if !line.starts_with("/nix/store/") { + debug!("Skipping line, not /nix/store: {}", line); + return None + } + + if !line.ends_with(".drv") { + debug!("Skipping line, not .drv: {}", line); + return None + } + + return Some(line) + } + Err(_) => None + }).collect(); + + if matches.len() == 1 { + return Some(matches.first().unwrap().clone()); + } else { + info!("Got wrong number of matches: {}", matches.len()); + info!("Matches: {:?}", matches); + return None + } +} + +fn file_to_str(f: &mut File) -> String { + let mut buffer = Vec::new(); + f.read_to_end(&mut buffer).expect("Reading eval output"); + return String::from(String::from_utf8_lossy(&buffer)); +} + +#[cfg(test)] +mod tests { + + use super::*; + + #[test] + fn stdenv_checking() { + let nix = nix::new(String::from("x86_64-linux"), String::from("daemon")); + let mut stdenv = Stdenvs::new(nix.clone(), PathBuf::from("/nix/var/nix/profiles/per-user/root/channels/nixos/nixpkgs")); + stdenv.identify(System::X8664Linux, StdenvFrom::Before); + stdenv.identify(System::X8664Darwin, StdenvFrom::Before); + + stdenv.identify(System::X8664Linux, StdenvFrom::After); + stdenv.identify(System::X8664Darwin, StdenvFrom::After); + + assert!(stdenv.are_same()); + } +} diff --git a/ofborg/src/tasks/mod.rs b/ofborg/src/tasks/mod.rs index fe92915..e0794d5 100644 --- a/ofborg/src/tasks/mod.rs +++ b/ofborg/src/tasks/mod.rs @@ -1,4 +1,5 @@ pub mod heartbeat; pub mod build; +pub mod massrebuilder; pub mod buildfilter;