This commit is contained in:
Graham Christensen 2018-01-21 14:17:25 -05:00
parent b1aa41b2de
commit e66776cee8
No known key found for this signature in database
GPG key ID: ACA1C1D120C83D5C
32 changed files with 927 additions and 758 deletions

View file

@ -5,9 +5,7 @@ pub struct ACL {
impl ACL { impl ACL {
pub fn new(authorized_users: Vec<String>) -> ACL { pub fn new(authorized_users: Vec<String>) -> ACL {
return ACL { return ACL { authorized_users: authorized_users };
authorized_users: authorized_users,
}
} }
pub fn can_build(&self, user: &str, repo: &str) -> bool { pub fn can_build(&self, user: &str, repo: &str) -> bool {

View file

@ -5,7 +5,7 @@ use std::process::ExitStatus;
use std::sync::mpsc::channel; use std::sync::mpsc::channel;
use std::process::Command; use std::process::Command;
use std::io::Read; use std::io::Read;
use std::sync::mpsc::{Sender,Receiver}; use std::sync::mpsc::{Sender, Receiver};
use std::io::BufReader; use std::io::BufReader;
use std::io::BufRead; use std::io::BufRead;
use std::process::Child; use std::process::Child;
@ -26,7 +26,7 @@ pub struct SpawnedAsyncCmd {
fn reader_tx<R: 'static + Read + Send>(read: R, tx: Sender<String>) -> thread::JoinHandle<()> { fn reader_tx<R: 'static + Read + Send>(read: R, tx: Sender<String>) -> thread::JoinHandle<()> {
let read = BufReader::new(read); let read = BufReader::new(read);
thread::spawn(move|| { thread::spawn(move || {
for line in read.lines() { for line in read.lines() {
if let Ok(line) = line { if let Ok(line) = line {
// println!("sending: {:?}", line); // println!("sending: {:?}", line);
@ -40,9 +40,7 @@ fn reader_tx<R: 'static + Read + Send>(read: R, tx: Sender<String>) -> thread::J
impl AsyncCmd { impl AsyncCmd {
pub fn new(cmd: Command) -> AsyncCmd { pub fn new(cmd: Command) -> AsyncCmd {
AsyncCmd { AsyncCmd { command: cmd }
command: cmd,
}
} }
pub fn spawn(mut self) -> SpawnedAsyncCmd { pub fn spawn(mut self) -> SpawnedAsyncCmd {
@ -55,10 +53,8 @@ impl AsyncCmd {
let (tx, rx) = channel(); let (tx, rx) = channel();
let stderr_handler = reader_tx(child.stderr.take().unwrap(), let stderr_handler = reader_tx(child.stderr.take().unwrap(), tx.clone());
tx.clone()); let stdout_handler = reader_tx(child.stdout.take().unwrap(), tx.clone());
let stdout_handler = reader_tx(child.stdout.take().unwrap(),
tx.clone());
SpawnedAsyncCmd { SpawnedAsyncCmd {
stdout_handler: stdout_handler, stdout_handler: stdout_handler,

View file

@ -51,29 +51,32 @@ fn main() {
} }
&None => { &None => {
warn!("Please define feedback.full_logs in your configuration to true or false!"); warn!("Please define feedback.full_logs in your configuration to true or false!");
warn!("feedback.full_logs when true will cause the full build log to be sent back to the server, and be viewable by everyone."); warn!("feedback.full_logs when true will cause the full build log to be sent back");
warn!("to the server, and be viewable by everyone.");
warn!("I strongly encourage everybody turn this on!"); warn!("I strongly encourage everybody turn this on!");
full_logs = false; full_logs = false;
} }
} }
channel.basic_prefetch(1).unwrap(); channel.basic_prefetch(1).unwrap();
channel.basic_consume( channel
notifyworker::new(tasks::build::BuildWorker::new( .basic_consume(
cloner, notifyworker::new(tasks::build::BuildWorker::new(
nix, cloner,
cfg.nix.system.clone(), nix,
cfg.runner.identity.clone(), cfg.nix.system.clone(),
full_logs, cfg.runner.identity.clone(),
)), full_logs,
format!("build-inputs-{}", cfg.nix.system.clone()).as_ref(), )),
format!("{}-builder", cfg.whoami()).as_ref(), format!("build-inputs-{}", cfg.nix.system.clone()).as_ref(),
false, format!("{}-builder", cfg.whoami()).as_ref(),
false, false,
false, false,
false, false,
Table::new() false,
).unwrap(); Table::new(),
)
.unwrap();
channel.start_consuming(); channel.start_consuming();

View file

@ -39,19 +39,21 @@ fn main() {
let mut channel = session.open_channel(2).unwrap(); let mut channel = session.open_channel(2).unwrap();
channel.basic_prefetch(1).unwrap(); channel.basic_prefetch(1).unwrap();
channel.basic_consume( channel
worker::new(tasks::githubcommentfilter::GitHubCommentWorker::new( .basic_consume(
cfg.acl(), worker::new(tasks::githubcommentfilter::GitHubCommentWorker::new(
cfg.github() cfg.acl(),
)), cfg.github(),
"build-inputs", )),
format!("{}-github-comment-filter", cfg.whoami()).as_ref(), "build-inputs",
false, format!("{}-github-comment-filter", cfg.whoami()).as_ref(),
false, false,
false, false,
false, false,
Table::new() false,
).unwrap(); Table::new(),
)
.unwrap();
channel.start_consuming(); channel.start_consuming();

View file

@ -45,20 +45,22 @@ fn main() {
nix, nix,
cfg.github(), cfg.github(),
cfg.runner.identity.clone(), cfg.runner.identity.clone(),
events events,
); );
channel.basic_prefetch(1).unwrap(); channel.basic_prefetch(1).unwrap();
channel.basic_consume( channel
worker::new(mrw), .basic_consume(
"mass-rebuild-check-jobs", worker::new(mrw),
format!("{}-mass-rebuild-checker", cfg.whoami()).as_ref(), "mass-rebuild-check-jobs",
false, format!("{}-mass-rebuild-checker", cfg.whoami()).as_ref(),
false, false,
false, false,
false, false,
Table::new() false,
).unwrap(); Table::new(),
)
.unwrap();
channel.start_consuming(); channel.start_consuming();

View file

@ -17,9 +17,15 @@ fn main() {
let nix = cfg.nix(); let nix = cfg.nix();
match nix.safely_build_attrs(&Path::new("./"), "./default.nix", vec![String::from("hello"),]) { match nix.safely_build_attrs(
Ok(mut out) => { print!("{}", file_to_str(&mut out)); } &Path::new("./"),
Err(mut out) => { print!("{}", file_to_str(&mut out)) } "./default.nix",
vec![String::from("hello")],
) {
Ok(mut out) => {
print!("{}", file_to_str(&mut out));
}
Err(mut out) => print!("{}", file_to_str(&mut out)),
} }
} }

View file

@ -1,7 +1,7 @@
use std::path::{Path,PathBuf}; use std::path::{Path, PathBuf};
use md5; use md5;
use std::fs; use std::fs;
use std::io::{Error,ErrorKind}; use std::io::{Error, ErrorKind};
use ofborg::clone; use ofborg::clone;
use ofborg::clone::GitClonable; use ofborg::clone::GitClonable;
use std::ffi::OsStr; use std::ffi::OsStr;
@ -9,18 +9,16 @@ use std::ffi::OsString;
use std::process::Command; use std::process::Command;
pub struct CachedCloner { pub struct CachedCloner {
root: PathBuf root: PathBuf,
} }
pub fn cached_cloner(path: &Path) -> CachedCloner { pub fn cached_cloner(path: &Path) -> CachedCloner {
return CachedCloner{ return CachedCloner { root: path.to_path_buf() };
root: path.to_path_buf()
}
} }
pub struct CachedProject { pub struct CachedProject {
root: PathBuf, root: PathBuf,
clone_url: String clone_url: String,
} }
pub struct CachedProjectCo { pub struct CachedProjectCo {
@ -41,10 +39,10 @@ impl CachedCloner {
new_root.push("repo"); new_root.push("repo");
new_root.push(format!("{:x}", md5::compute(&name))); new_root.push(format!("{:x}", md5::compute(&name)));
return CachedProject{ return CachedProject {
root: new_root, root: new_root,
clone_url: clone_url clone_url: clone_url,
} };
} }
} }
@ -55,12 +53,12 @@ impl CachedProject {
let mut new_root = self.root.clone(); let mut new_root = self.root.clone();
new_root.push(use_category); new_root.push(use_category);
return Ok(CachedProjectCo{ return Ok(CachedProjectCo {
root: new_root, root: new_root,
id: id, id: id,
clone_url: self.clone_from().clone(), clone_url: self.clone_from().clone(),
local_reference: self.clone_to().clone(), local_reference: self.clone_to().clone(),
}) });
} }
fn prefetch_cache(&self) -> Result<PathBuf, Error> { fn prefetch_cache(&self) -> Result<PathBuf, Error> {
@ -91,10 +89,10 @@ impl CachedProjectCo {
// let build_dir = self.build_dir(); // let build_dir = self.build_dir();
return Ok(self.clone_to().to_str().unwrap().to_string()) return Ok(self.clone_to().to_str().unwrap().to_string());
} }
pub fn fetch_pr(&self, pr_id: u64) -> Result<(),Error> { pub fn fetch_pr(&self, pr_id: u64) -> Result<(), Error> {
let mut lock = self.lock()?; let mut lock = self.lock()?;
let result = Command::new("git") let result = Command::new("git")
@ -107,7 +105,7 @@ impl CachedProjectCo {
lock.unlock(); lock.unlock();
if result.success() { if result.success() {
return Ok(()) return Ok(());
} else { } else {
return Err(Error::new(ErrorKind::Other, "Failed to fetch PR")); return Err(Error::new(ErrorKind::Other, "Failed to fetch PR"));
} }
@ -144,7 +142,7 @@ impl CachedProjectCo {
lock.unlock(); lock.unlock();
if result.success() { if result.success() {
return Ok(()) return Ok(());
} else { } else {
return Err(Error::new(ErrorKind::Other, "Failed to merge")); return Err(Error::new(ErrorKind::Other, "Failed to merge"));
} }
@ -153,19 +151,19 @@ impl CachedProjectCo {
impl clone::GitClonable for CachedProjectCo { impl clone::GitClonable for CachedProjectCo {
fn clone_from(&self) -> String { fn clone_from(&self) -> String {
return self.clone_url.clone() return self.clone_url.clone();
} }
fn clone_to(&self) -> PathBuf { fn clone_to(&self) -> PathBuf {
let mut clone_path = self.root.clone(); let mut clone_path = self.root.clone();
clone_path.push(&self.id); clone_path.push(&self.id);
return clone_path return clone_path;
} }
fn lock_path(&self) -> PathBuf { fn lock_path(&self) -> PathBuf {
let mut lock_path = self.root.clone(); let mut lock_path = self.root.clone();
lock_path.push(format!("{}.lock", self.id)); lock_path.push(format!("{}.lock", self.id));
return lock_path return lock_path;
} }
fn extra_clone_args(&self) -> Vec<&OsStr> { fn extra_clone_args(&self) -> Vec<&OsStr> {
@ -174,30 +172,28 @@ impl clone::GitClonable for CachedProjectCo {
OsStr::new("--shared"), OsStr::new("--shared"),
OsStr::new("--reference-if-able"), OsStr::new("--reference-if-able"),
local_ref, local_ref,
] ];
} }
} }
impl clone::GitClonable for CachedProject { impl clone::GitClonable for CachedProject {
fn clone_from(&self) -> String { fn clone_from(&self) -> String {
return self.clone_url.clone() return self.clone_url.clone();
} }
fn clone_to(&self) -> PathBuf { fn clone_to(&self) -> PathBuf {
let mut clone_path = self.root.clone(); let mut clone_path = self.root.clone();
clone_path.push("clone"); clone_path.push("clone");
return clone_path return clone_path;
} }
fn lock_path(&self) -> PathBuf { fn lock_path(&self) -> PathBuf {
let mut clone_path = self.root.clone(); let mut clone_path = self.root.clone();
clone_path.push("clone.lock"); clone_path.push("clone.lock");
return clone_path return clone_path;
} }
fn extra_clone_args(&self) -> Vec<&OsStr> { fn extra_clone_args(&self) -> Vec<&OsStr> {
return vec![ return vec![OsStr::new("--bare")];
OsStr::new("--bare"),
]
} }
} }

View file

@ -1,12 +1,12 @@
use std::path::PathBuf; use std::path::PathBuf;
use fs2::FileExt; use fs2::FileExt;
use std::fs; use std::fs;
use std::io::{Error,ErrorKind}; use std::io::{Error, ErrorKind};
use std::process::Command; use std::process::Command;
use std::ffi::OsStr; use std::ffi::OsStr;
pub struct Lock { pub struct Lock {
lock: Option<fs::File> lock: Option<fs::File>,
} }
impl Lock { impl Lock {
@ -27,24 +27,22 @@ pub trait GitClonable {
match fs::File::create(self.lock_path()) { match fs::File::create(self.lock_path()) {
Err(e) => { Err(e) => {
warn!("Failed to create lock file {:?}: {}", warn!("Failed to create lock file {:?}: {}", self.lock_path(), e);
self.lock_path(), e
);
return Err(e); return Err(e);
} }
Ok(lock) => { Ok(lock) => {
match lock.lock_exclusive() { match lock.lock_exclusive() {
Err(e) => { Err(e) => {
warn!("Failed to get exclusive lock on file {:?}: {}", warn!(
self.lock_path(), e "Failed to get exclusive lock on file {:?}: {}",
self.lock_path(),
e
); );
return Err(e); return Err(e);
} }
Ok(_) => { Ok(_) => {
debug!("Got lock on {:?}", self.lock_path()); debug!("Got lock on {:?}", self.lock_path());
return Ok(Lock{ return Ok(Lock { lock: Some(lock) });
lock: Some(lock)
});
} }
} }
} }
@ -56,14 +54,15 @@ pub trait GitClonable {
let mut lock = self.lock()?; let mut lock = self.lock()?;
if self.clone_to().is_dir() { if self.clone_to().is_dir() {
debug!("Found dir at {:?}, initial clone is done", debug!("Found dir at {:?}, initial clone is done", self.clone_to());
self.clone_to()); return Ok(());
return Ok(())
} }
info!("Initial cloning of {} to {:?}", info!(
self.clone_from(), "Initial cloning of {} to {:?}",
self.clone_to()); self.clone_from(),
self.clone_to()
);
let result = Command::new("git") let result = Command::new("git")
.arg("clone") .arg("clone")
@ -75,7 +74,7 @@ pub trait GitClonable {
lock.unlock(); lock.unlock();
if result.success() { if result.success() {
return Ok(()) return Ok(());
} else { } else {
return Err(Error::new(ErrorKind::Other, "Failed to clone")); return Err(Error::new(ErrorKind::Other, "Failed to clone"));
} }
@ -94,7 +93,7 @@ pub trait GitClonable {
lock.unlock(); lock.unlock();
if result.success() { if result.success() {
return Ok(()) return Ok(());
} else { } else {
return Err(Error::new(ErrorKind::Other, "Failed to fetch")); return Err(Error::new(ErrorKind::Other, "Failed to fetch"));
} }
@ -126,7 +125,7 @@ pub trait GitClonable {
lock.unlock(); lock.unlock();
return Ok(()) return Ok(());
} }
fn checkout(&self, git_ref: &OsStr) -> Result<(), Error> { fn checkout(&self, git_ref: &OsStr) -> Result<(), Error> {
@ -143,7 +142,7 @@ pub trait GitClonable {
lock.unlock(); lock.unlock();
if result.success() { if result.success() {
return Ok(()) return Ok(());
} else { } else {
return Err(Error::new(ErrorKind::Other, "Failed to checkout")); return Err(Error::new(ErrorKind::Other, "Failed to checkout"));
} }

View file

@ -3,7 +3,7 @@ pub fn parse(text: &str) -> Option<Vec<Instruction>> {
let instructions: Vec<Instruction> = text.lines() let instructions: Vec<Instruction> = text.lines()
.map(|s| match parse_line(s) { .map(|s| match parse_line(s) {
Some(instructions) => instructions, Some(instructions) => instructions,
None => vec![] None => vec![],
}) })
.fold(vec![], |mut collector, mut inst| { .fold(vec![], |mut collector, mut inst| {
collector.append(&mut inst); collector.append(&mut inst);
@ -13,13 +13,12 @@ pub fn parse(text: &str) -> Option<Vec<Instruction>> {
if instructions.len() == 0 { if instructions.len() == 0 {
return None; return None;
} else { } else {
return Some(instructions) return Some(instructions);
} }
} }
pub fn parse_line(text: &str) -> Option<Vec<Instruction>> { pub fn parse_line(text: &str) -> Option<Vec<Instruction>> {
let tokens: Vec<String> = text.split_whitespace() let tokens: Vec<String> = text.split_whitespace().map(|s| s.to_owned()).collect();
.map(|s| s.to_owned()).collect();
if tokens.len() < 2 { if tokens.len() < 2 {
return None; return None;
@ -38,23 +37,18 @@ pub fn parse_line(text: &str) -> Option<Vec<Instruction>> {
for command in commands { for command in commands {
let (left, right) = command.split_at(1); let (left, right) = command.split_at(1);
match left[0].as_ref() { match left[0].as_ref() {
"build" => { "build" => instructions.push(Instruction::Build(Subset::Nixpkgs, right.to_vec())),
instructions.push(Instruction::Build(Subset::Nixpkgs, right.to_vec()))
}
"test" => { "test" => {
instructions.push( instructions.push(Instruction::Build(
Instruction::Build(Subset::NixOS, Subset::NixOS,
right right
.into_iter() .into_iter()
.map(|attr| format!("tests.{}.x86_64-linux", attr)) .map(|attr| format!("tests.{}.x86_64-linux", attr))
.collect() .collect(),
) ));
);
} }
"eval" => { "eval" => instructions.push(Instruction::Eval),
instructions.push(Instruction::Eval)
}
_ => {} _ => {}
} }
} }
@ -65,8 +59,7 @@ pub fn parse_line(text: &str) -> Option<Vec<Instruction>> {
#[derive(PartialEq, Debug)] #[derive(PartialEq, Debug)]
pub enum Instruction { pub enum Instruction {
Build(Subset, Vec<String>), Build(Subset, Vec<String>),
Eval Eval,
} }
#[derive(Serialize, Deserialize, Debug, PartialEq)] #[derive(Serialize, Deserialize, Debug, PartialEq)]
@ -89,8 +82,10 @@ mod tests {
fn valid_trailing_instruction() { fn valid_trailing_instruction() {
assert_eq!( assert_eq!(
Some(vec![Instruction::Eval]), Some(vec![Instruction::Eval]),
parse("/cc @grahamc for ^^ parse(
@GrahamcOfBorg eval") "/cc @grahamc for ^^
@GrahamcOfBorg eval",
)
); );
} }
@ -101,50 +96,62 @@ mod tests {
#[test] #[test]
fn eval_comment() { fn eval_comment() {
assert_eq!(Some(vec![Instruction::Eval]), assert_eq!(Some(vec![Instruction::Eval]), parse("@grahamcofborg eval"));
parse("@grahamcofborg eval"));
} }
#[test] #[test]
fn eval_and_build_comment() { fn eval_and_build_comment() {
assert_eq!(Some(vec![ assert_eq!(
Instruction::Eval, Some(vec![
Instruction::Build(Subset::Nixpkgs, vec![ Instruction::Eval,
String::from("foo"), Instruction::Build(
]) Subset::Nixpkgs,
]), vec![String::from("foo")]
parse("@grahamcofborg eval @grahamcofborg build foo")); ),
]),
parse("@grahamcofborg eval @grahamcofborg build foo")
);
} }
#[test] #[test]
fn build_and_eval_and_build_comment() { fn build_and_eval_and_build_comment() {
assert_eq!(Some(vec![ assert_eq!(
Instruction::Build(Subset::Nixpkgs, vec![ Some(vec![
String::from("bar"), Instruction::Build(
Subset::Nixpkgs,
vec![String::from("bar")]
),
Instruction::Eval,
Instruction::Build(
Subset::Nixpkgs,
vec![String::from("foo")]
),
]), ]),
Instruction::Eval, parse(
Instruction::Build(Subset::Nixpkgs, vec![ "
String::from("foo"),
])
]),
parse("
@grahamcofborg build bar @grahamcofborg build bar
@grahamcofborg eval @grahamcofborg eval
@grahamcofborg build foo")); @grahamcofborg build foo",
)
);
} }
#[test] #[test]
fn complex_comment_with_paragraphs() { fn complex_comment_with_paragraphs() {
assert_eq!(Some(vec![ assert_eq!(
Instruction::Build(Subset::Nixpkgs, vec![ Some(vec![
String::from("bar"), Instruction::Build(
Subset::Nixpkgs,
vec![String::from("bar")]
),
Instruction::Eval,
Instruction::Build(
Subset::Nixpkgs,
vec![String::from("foo")]
),
]), ]),
Instruction::Eval, parse(
Instruction::Build(Subset::Nixpkgs, vec![ "
String::from("foo"),
])
]),
parse("
I like where you're going with this PR, so let's try it out! I like where you're going with this PR, so let's try it out!
@grahamcofborg build bar @grahamcofborg build bar
@ -154,70 +161,109 @@ I noticed though that the target branch was broken, which should be fixed. Let's
@grahamcofborg eval @grahamcofborg eval
Also, just in case, let's try foo Also, just in case, let's try foo
@grahamcofborg build foo")); @grahamcofborg build foo",
)
);
} }
#[test] #[test]
fn build_and_eval_comment() { fn build_and_eval_comment() {
assert_eq!(Some(vec![ assert_eq!(
Instruction::Build(Subset::Nixpkgs, vec![ Some(vec![
String::from("foo"), Instruction::Build(
Subset::Nixpkgs,
vec![String::from("foo")]
),
Instruction::Eval,
]), ]),
Instruction::Eval, parse("@grahamcofborg build foo @grahamcofborg eval")
]), );
parse("@grahamcofborg build foo @grahamcofborg eval"));
} }
#[test] #[test]
fn build_comment() { fn build_comment() {
assert_eq!(Some(vec![Instruction::Build(Subset::Nixpkgs, vec![ assert_eq!(
String::from("foo"), Some(vec![
String::from("bar") Instruction::Build(
])]), Subset::Nixpkgs,
parse("@GrahamCOfBorg build foo bar vec![String::from("foo"), String::from("bar")]
),
]),
parse(
"@GrahamCOfBorg build foo bar
baz")); baz",
)
);
} }
#[test] #[test]
fn test_comment() { fn test_comment() {
assert_eq!(Some(vec![Instruction::Build(Subset::NixOS, vec![ assert_eq!(
String::from("tests.foo.x86_64-linux"), Some(vec![
String::from("tests.bar.x86_64-linux"), Instruction::Build(
String::from("tests.baz.x86_64-linux") Subset::NixOS,
])]), vec![
parse("@GrahamCOfBorg test foo bar baz")); String::from("tests.foo.x86_64-linux"),
String::from("tests.bar.x86_64-linux"),
String::from("tests.baz.x86_64-linux"),
]
),
]),
parse("@GrahamCOfBorg test foo bar baz")
);
} }
#[test] #[test]
fn build_comment_newlines() { fn build_comment_newlines() {
assert_eq!(Some(vec![Instruction::Build(Subset::Nixpkgs, vec![ assert_eq!(
String::from("foo"), Some(vec![
String::from("bar"), Instruction::Build(
String::from("baz") Subset::Nixpkgs,
])]), vec![
parse("@GrahamCOfBorg build foo bar baz")); String::from("foo"),
String::from("bar"),
String::from("baz"),
]
),
]),
parse("@GrahamCOfBorg build foo bar baz")
);
} }
#[test] #[test]
fn build_comment_lower() { fn build_comment_lower() {
assert_eq!(Some(vec![Instruction::Build(Subset::Nixpkgs, vec![ assert_eq!(
String::from("foo"), Some(vec![
String::from("bar"), Instruction::Build(
String::from("baz") Subset::Nixpkgs,
])]), vec![
parse("@grahamcofborg build foo bar baz")); String::from("foo"),
String::from("bar"),
String::from("baz"),
]
),
]),
parse("@grahamcofborg build foo bar baz")
);
} }
#[test] #[test]
fn build_comment_lower_package_case_retained() { fn build_comment_lower_package_case_retained() {
assert_eq!(Some(vec![Instruction::Build(Subset::Nixpkgs, vec![ assert_eq!(
String::from("foo"), Some(vec![
String::from("bar"), Instruction::Build(
String::from("baz.Baz") Subset::Nixpkgs,
])]), vec![
parse("@grahamcofborg build foo bar baz.Baz")); String::from("foo"),
String::from("bar"),
String::from("baz.Baz"),
]
),
]),
parse("@grahamcofborg build foo bar baz.Baz")
);
} }
} }

View file

@ -11,8 +11,14 @@ pub struct CommitStatus<'a> {
url: String, url: String,
} }
impl <'a> CommitStatus<'a> { impl<'a> CommitStatus<'a> {
pub fn new(api: hubcaps::statuses::Statuses<'a>, sha: String, context: String, description: String, url: Option<String>) -> CommitStatus<'a> { pub fn new(
api: hubcaps::statuses::Statuses<'a>,
sha: String,
context: String,
description: String,
url: Option<String>,
) -> CommitStatus<'a> {
let mut stat = CommitStatus { let mut stat = CommitStatus {
api: api, api: api,
sha: sha, sha: sha,
@ -23,7 +29,7 @@ impl <'a> CommitStatus<'a> {
stat.set_url(url); stat.set_url(url);
return stat return stat;
} }
pub fn set_url(&mut self, url: Option<String>) { pub fn set_url(&mut self, url: Option<String>) {
@ -40,13 +46,15 @@ impl <'a> CommitStatus<'a> {
} }
pub fn set(&self, state: hubcaps::statuses::State) { pub fn set(&self, state: hubcaps::statuses::State) {
self.api.create( self.api
self.sha.as_ref(), .create(
&hubcaps::statuses::StatusOptions::builder(state) self.sha.as_ref(),
.context(self.context.clone()) &hubcaps::statuses::StatusOptions::builder(state)
.description(self.description.clone()) .context(self.context.clone())
.target_url(self.url.clone()) .description(self.description.clone())
.build() .target_url(self.url.clone())
).expect("Failed to mark final status on commit"); .build(),
)
.expect("Failed to mark final status on commit");
} }
} }

View file

@ -50,7 +50,7 @@ pub struct GithubConfig {
#[derive(Serialize, Deserialize, Debug)] #[derive(Serialize, Deserialize, Debug)]
pub struct RunnerConfig { pub struct RunnerConfig {
pub identity: String, pub identity: String,
pub authorized_users: Option<Vec<String>> pub authorized_users: Option<Vec<String>>,
} }
#[derive(Serialize, Deserialize, Debug)] #[derive(Serialize, Deserialize, Debug)]
@ -64,48 +64,48 @@ impl Config {
} }
pub fn acl(&self) -> acl::ACL { pub fn acl(&self) -> acl::ACL {
return acl::ACL::new( return acl::ACL::new(self.runner.authorized_users.clone().expect(
self.runner.authorized_users "fetching config's runner.authorized_users",
.clone() ));
.expect("fetching config's runner.authorized_users")
);
} }
pub fn github(&self) -> Github { pub fn github(&self) -> Github {
Github::new( Github::new(
"github.com/grahamc/ofborg", "github.com/grahamc/ofborg",
// tls configured hyper client // tls configured hyper client
Client::with_connector( Client::with_connector(HttpsConnector::new(NativeTlsClient::new().unwrap())),
HttpsConnector::new( Credentials::Token(self.github.clone().unwrap().token),
NativeTlsClient::new().unwrap()
)
),
Credentials::Token(self.github.clone().unwrap().token)
) )
} }
pub fn nix(&self) -> Nix { pub fn nix(&self) -> Nix {
if self.nix.build_timeout_seconds < 1200 { if self.nix.build_timeout_seconds < 1200 {
error!("Note: {} is way too low for build_timeout_seconds!", error!(
self.nix.build_timeout_seconds "Note: {} is way too low for build_timeout_seconds!",
self.nix.build_timeout_seconds
); );
error!("Please set build_timeout_seconds to at least 1200"); error!("Please set build_timeout_seconds to at least 1200");
panic!(); panic!();
} }
return Nix::new(self.nix.system.clone(), return Nix::new(
self.nix.remote.clone(), self.nix.system.clone(),
self.nix.build_timeout_seconds self.nix.remote.clone(),
self.nix.build_timeout_seconds,
); );
} }
} }
impl RabbitMQConfig { impl RabbitMQConfig {
pub fn as_uri(&self) -> String{ pub fn as_uri(&self) -> String {
return format!("{}://{}:{}@{}//", return format!(
if self.ssl { "amqps" } else { "amqp" }, "{}://{}:{}@{}//",
self.username, self.password, self.host); if self.ssl { "amqps" } else { "amqp" },
self.username,
self.password,
self.host
);
} }
} }

View file

@ -10,12 +10,11 @@ pub struct EvalChecker {
cmd: String, cmd: String,
args: Vec<String>, args: Vec<String>,
nix: nix::Nix, nix: nix::Nix,
} }
impl EvalChecker { impl EvalChecker {
pub fn new(name: &str, cmd: &str, args: Vec<String>, nix: nix::Nix) -> EvalChecker { pub fn new(name: &str, cmd: &str, args: Vec<String>, nix: nix::Nix) -> EvalChecker {
EvalChecker{ EvalChecker {
name: name.to_owned(), name: name.to_owned(),
cmd: cmd.to_owned(), cmd: cmd.to_owned(),
args: args, args: args,

View file

@ -24,6 +24,4 @@ pub struct Issue {
} }
#[derive(Serialize, Deserialize, Debug)] #[derive(Serialize, Deserialize, Debug)]
pub struct PullRequest { pub struct PullRequest {}
}

View file

@ -1,4 +1,4 @@
use ofborg::ghevent::{Comment,Repository,Issue}; use ofborg::ghevent::{Comment, Repository, Issue};
#[derive(Serialize, Deserialize, Debug)] #[derive(Serialize, Deserialize, Debug)]
pub struct IssueComment { pub struct IssueComment {

View file

@ -1,4 +1,4 @@
use std::path::PathBuf ; use std::path::PathBuf;
use fs2::FileExt; use fs2::FileExt;
use std::fs; use std::fs;
use std::io::Error; use std::io::Error;
@ -10,14 +10,12 @@ pub trait Lockable {
fn lock(&self) -> Result<Lock, Error> { fn lock(&self) -> Result<Lock, Error> {
let lock = fs::File::create(self.lock_path())?; let lock = fs::File::create(self.lock_path())?;
lock.lock_exclusive()?; lock.lock_exclusive()?;
return Ok(Lock{ return Ok(Lock { lock: Some(lock) });
lock: Some(lock)
})
} }
} }
pub struct Lock { pub struct Lock {
lock: Option<fs::File> lock: Option<fs::File>,
} }
impl Lock { impl Lock {

View file

@ -1,4 +1,4 @@
use ofborg::message::{Pr,Repo}; use ofborg::message::{Pr, Repo};
use ofborg::message::buildresult; use ofborg::message::buildresult;
use ofborg::commentparser::Subset; use ofborg::commentparser::Subset;
use ofborg::worker; use ofborg::worker;

View file

@ -1,17 +1,17 @@
use ofborg::message::{Pr,Repo}; use ofborg::message::{Pr, Repo};
#[derive(Serialize, Deserialize, Debug)] #[derive(Serialize, Deserialize, Debug)]
pub struct BuildLogMsg { pub struct BuildLogMsg {
pub system: String,
pub identity: String,
pub attempt_id: String,
pub line_number: u64, pub line_number: u64,
pub output: String, pub output: String,
pub identity: String,
pub system: String,
pub attempt_id: String,
} }
#[derive(Serialize, Deserialize, Debug)] #[derive(Serialize, Deserialize, Debug)]
pub struct BuildLogStart { pub struct BuildLogStart {
pub identity: String,
pub system: String, pub system: String,
pub identity: String,
pub attempt_id: String, pub attempt_id: String,
} }

View file

@ -1,4 +1,4 @@
use ofborg::message::{Pr,Repo}; use ofborg::message::{Pr, Repo};
#[derive(Serialize, Deserialize, Debug)] #[derive(Serialize, Deserialize, Debug)]
pub struct BuildResult { pub struct BuildResult {
@ -6,5 +6,5 @@ pub struct BuildResult {
pub pr: Pr, pub pr: Pr,
pub system: String, pub system: String,
pub output: Vec<String>, pub output: Vec<String>,
pub success: bool pub success: bool,
} }

View file

@ -1,4 +1,4 @@
use ofborg::message::{Pr,Repo}; use ofborg::message::{Pr, Repo};
use ofborg::worker; use ofborg::worker;
use serde_json; use serde_json;
@ -13,20 +13,14 @@ pub struct MassRebuildJob {
pub pr: Pr, pub pr: Pr,
} }
pub struct Actions { pub struct Actions {}
}
impl Actions { impl Actions {
pub fn skip(&mut self, _job: &MassRebuildJob) -> worker::Actions { pub fn skip(&mut self, _job: &MassRebuildJob) -> worker::Actions {
return vec![ return vec![worker::Action::Ack];
worker::Action::Ack
];
} }
pub fn done(&mut self, _job: &MassRebuildJob) -> worker::Actions { pub fn done(&mut self, _job: &MassRebuildJob) -> worker::Actions {
return vec![ return vec![worker::Action::Ack];
worker::Action::Ack
];
} }
} }

View file

@ -5,4 +5,4 @@ pub mod massrebuildjob;
pub mod plasticheartbeat; pub mod plasticheartbeat;
pub mod buildlogmsg; pub mod buildlogmsg;
pub use self::common::{Pr,Repo}; pub use self::common::{Pr, Repo};

View file

@ -4,8 +4,7 @@ extern crate env_logger;
use serde_json; use serde_json;
#[derive(Serialize, Deserialize, Debug)] #[derive(Serialize, Deserialize, Debug)]
pub struct PlasticHeartbeat { pub struct PlasticHeartbeat {}
}
pub fn from(data: &Vec<u8>) -> Result<PlasticHeartbeat, serde_json::error::Error> { pub fn from(data: &Vec<u8>) -> Result<PlasticHeartbeat, serde_json::error::Error> {
return serde_json::from_slice(&data); return serde_json::from_slice(&data);

View file

@ -1,6 +1,6 @@
use std::path::Path; use std::path::Path;
use std::ffi::OsString; use std::ffi::OsString;
use std::process::{Command,Stdio}; use std::process::{Command, Stdio};
use tempfile::tempfile; use tempfile::tempfile;
use std::fs::File; use std::fs::File;
use std::io::Seek; use std::io::Seek;
@ -10,33 +10,43 @@ use std::io::SeekFrom;
pub struct Nix { pub struct Nix {
system: String, system: String,
remote: String, remote: String,
build_timeout: u16 build_timeout: u16,
} }
impl Nix { impl Nix {
pub fn new(system: String, remote: String, build_timeout: u16) -> Nix { pub fn new(system: String, remote: String, build_timeout: u16) -> Nix {
return Nix{ return Nix {
system: system, system: system,
remote: remote, remote: remote,
build_timeout: build_timeout, build_timeout: build_timeout,
} };
} }
pub fn with_system(&self, system: String) -> Nix { pub fn with_system(&self, system: String) -> Nix {
return Nix{ return Nix {
system: system, system: system,
remote: self.remote.clone(), remote: self.remote.clone(),
build_timeout: self.build_timeout, build_timeout: self.build_timeout,
}; };
} }
pub fn safely_build_attrs(&self, nixpkgs: &Path, file: &str, attrs: Vec<String>) -> Result<File,File> { pub fn safely_build_attrs(
&self,
nixpkgs: &Path,
file: &str,
attrs: Vec<String>,
) -> Result<File, File> {
let cmd = self.safely_build_attrs_cmd(nixpkgs, file, attrs); let cmd = self.safely_build_attrs_cmd(nixpkgs, file, attrs);
return self.run(cmd, true); return self.run(cmd, true);
} }
pub fn safely_build_attrs_cmd(&self, nixpkgs: &Path, file: &str, attrs: Vec<String>) -> Command { pub fn safely_build_attrs_cmd(
&self,
nixpkgs: &Path,
file: &str,
attrs: Vec<String>,
) -> Command {
let mut attrargs: Vec<String> = Vec::with_capacity(3 + (attrs.len() * 2)); let mut attrargs: Vec<String> = Vec::with_capacity(3 + (attrs.len() * 2));
attrargs.push(file.to_owned()); attrargs.push(file.to_owned());
attrargs.push(String::from("--no-out-link")); attrargs.push(String::from("--no-out-link"));
@ -49,11 +59,17 @@ impl Nix {
return self.safe_command("nix-build", nixpkgs, attrargs); return self.safe_command("nix-build", nixpkgs, attrargs);
} }
pub fn safely(&self, cmd: &str, nixpkgs: &Path, args: Vec<String>, keep_stdout: bool) -> Result<File,File> { pub fn safely(
&self,
cmd: &str,
nixpkgs: &Path,
args: Vec<String>,
keep_stdout: bool,
) -> Result<File, File> {
return self.run(self.safe_command(cmd, nixpkgs, args), keep_stdout); return self.run(self.safe_command(cmd, nixpkgs, args), keep_stdout);
} }
pub fn run(&self, mut cmd: Command, keep_stdout: bool) -> Result<File,File> { pub fn run(&self, mut cmd: Command, keep_stdout: bool) -> Result<File, File> {
let stderr = tempfile().expect("Fetching a stderr tempfile"); let stderr = tempfile().expect("Fetching a stderr tempfile");
let mut reader = stderr.try_clone().expect("Cloning stderr to the reader"); let mut reader = stderr.try_clone().expect("Cloning stderr to the reader");
@ -66,18 +82,19 @@ impl Nix {
stdout = Stdio::null(); stdout = Stdio::null();
} }
let status = cmd let status = cmd.stdout(Stdio::from(stdout))
.stdout(Stdio::from(stdout))
.stderr(Stdio::from(stderr)) .stderr(Stdio::from(stderr))
.status() .status()
.expect(format!("Running a program ...").as_ref()); .expect(format!("Running a program ...").as_ref());
reader.seek(SeekFrom::Start(0)).expect("Seeking to Start(0)"); reader.seek(SeekFrom::Start(0)).expect(
"Seeking to Start(0)",
);
if status.success() { if status.success() {
return Ok(reader) return Ok(reader);
} else { } else {
return Err(reader) return Err(reader);
} }
} }
@ -94,13 +111,18 @@ impl Nix {
command.env("NIX_REMOTE", &self.remote); command.env("NIX_REMOTE", &self.remote);
command.args(&["--show-trace"]); command.args(&["--show-trace"]);
command.args(&["--option", "restrict-eval", "true"]); command.args(&["--option", "restrict-eval", "true"]);
command.args(&["--option", "build-timeout", &format!("{}", self.build_timeout)]); command.args(
&[
"--option",
"build-timeout",
&format!("{}", self.build_timeout),
],
);
command.args(&["--argstr", "system", &self.system]); command.args(&["--argstr", "system", &self.system]);
command.args(args); command.args(args);
return command; return command;
} }
} }
#[cfg(test)] #[cfg(test)]
@ -127,7 +149,7 @@ mod tests {
Fail, Fail,
} }
fn assert_run(res: Result<File,File>, expected: Expect, require: Vec<&str>) { fn assert_run(res: Result<File, File>, expected: Expect, require: Vec<&str>) {
let expectation_held: bool = match expected { let expectation_held: bool = match expected {
Expect::Pass => res.is_ok(), Expect::Pass => res.is_ok(),
Expect::Fail => res.is_err(), Expect::Fail => res.is_err(),
@ -153,23 +175,18 @@ mod tests {
let total_requirements = require.len(); let total_requirements = require.len();
let mut missed_requirements: usize = 0; let mut missed_requirements: usize = 0;
let requirements_held: Vec<Result<String, String>> = let requirements_held: Vec<Result<String, String>> = require
require.into_iter() .into_iter()
.map(|line| line.to_owned()) .map(|line| line.to_owned())
.map(|line| .map(|line| if buildlog.contains(&line) {
if buildlog.contains(&line) { Ok(line)
Ok(line) } else {
} else { missed_requirements += 1;
missed_requirements += 1; Err(line)
Err(line) })
}
)
.collect(); .collect();
let mut prefixes: Vec<String> = vec![ let mut prefixes: Vec<String> = vec!["".to_owned(), "".to_owned()];
"".to_owned(),
"".to_owned(),
];
if !expectation_held { if !expectation_held {
prefixes.push(format!( prefixes.push(format!(
@ -178,18 +195,16 @@ mod tests {
)); ));
prefixes.push("".to_owned()); prefixes.push("".to_owned());
} else { } else {
prefixes.push(format!( prefixes.push(format!("The run was expected to {:?}, and did.", expected));
"The run was expected to {:?}, and did.",
expected
));
prefixes.push("".to_owned()); prefixes.push("".to_owned());
} }
let mut suffixes = vec![ let mut suffixes = vec![
"".to_owned(), "".to_owned(),
format!("{} out of {} required lines matched.", format!(
(total_requirements - missed_requirements), "{} out of {} required lines matched.",
total_requirements (total_requirements - missed_requirements),
total_requirements
), ),
"".to_owned(), "".to_owned(),
]; ];
@ -199,20 +214,15 @@ mod tests {
} }
suffixes.push("".to_owned()); suffixes.push("".to_owned());
let output_blocks: Vec<Vec<String>> = vec![ let output_blocks: Vec<Vec<String>> =
prefixes, vec![prefixes, vec![buildlog, "".to_owned()], suffixes];
vec![buildlog, "".to_owned()],
suffixes,
];
let output_blocks_strings: Vec<String> = let output_blocks_strings: Vec<String> = output_blocks
output_blocks
.into_iter() .into_iter()
.map(|lines| lines.join("\n")) .map(|lines| lines.join("\n"))
.collect(); .collect();
let output: String = output_blocks_strings let output: String = output_blocks_strings.join("\n");
.join("\n");
if expectation_held && missed_requirements == 0 { if expectation_held && missed_requirements == 0 {
} else { } else {
@ -230,67 +240,78 @@ mod tests {
fn safely_build_attrs_success() { fn safely_build_attrs_success() {
let nix = nix(); let nix = nix();
let ret: Result<File,File> = nix.safely_build_attrs( let ret: Result<File, File> = nix.safely_build_attrs(
build_path().as_path(), build_path().as_path(),
"default.nix", "default.nix",
vec![String::from("success")] vec![String::from("success")],
); );
assert_run(ret, Expect::Pass, vec![ assert_run(
"-success.drv", ret,
"building path(s)", Expect::Pass,
"hi", vec!["-success.drv", "building path(s)", "hi", "-success"],
"-success" );
]);
} }
#[test] #[test]
fn safely_build_attrs_failure() { fn safely_build_attrs_failure() {
let nix = nix(); let nix = nix();
let ret: Result<File,File> = nix.safely_build_attrs( let ret: Result<File, File> = nix.safely_build_attrs(
build_path().as_path(), build_path().as_path(),
"default.nix", "default.nix",
vec![String::from("failed")] vec![String::from("failed")],
); );
assert_run(ret, Expect::Fail, vec![ assert_run(
"-failed.drv", ret,
"building path(s)", Expect::Fail,
"hi", vec![
"failed to produce output path" "-failed.drv",
]); "building path(s)",
"hi",
"failed to produce output path",
],
);
} }
#[test] #[test]
fn strict_sandboxing() { fn strict_sandboxing() {
let ret: Result<File,File> = nix().safely_build_attrs( let ret: Result<File, File> = nix().safely_build_attrs(
build_path().as_path(), build_path().as_path(),
"default.nix", "default.nix",
vec![String::from("sandbox-violation")] vec![String::from("sandbox-violation")],
); );
assert_run(ret, Expect::Fail, vec![ assert_run(
"error: while evaluating the attribute", ret,
"access to path", Expect::Fail,
"is forbidden in restricted mode" vec![
]); "error: while evaluating the attribute",
"access to path",
"is forbidden in restricted mode",
],
);
} }
#[test] #[test]
fn instantiation() { fn instantiation() {
let ret: Result<File,File> = nix().safely( let ret: Result<File, File> = nix().safely(
"nix-instantiate", "nix-instantiate",
passing_eval_path().as_path(), passing_eval_path().as_path(),
vec![], vec![],
true true,
); );
assert_run(ret, Expect::Pass, vec![ assert_run(
"the result might be removed by the garbage collector", ret,
"-failed.drv", Expect::Pass,
"-success.drv" vec![
]); "the result might be removed by the garbage collector",
"-failed.drv",
"-success.drv",
],
);
} }
} }

View file

@ -1,6 +1,6 @@
use amqp::Basic; use amqp::Basic;
use amqp::{Consumer, Channel}; use amqp::{Consumer, Channel};
use amqp::protocol::basic::{Deliver,BasicProperties}; use amqp::protocol::basic::{Deliver, BasicProperties};
use std::marker::Send; use std::marker::Send;
use worker::Action; use worker::Action;
@ -13,8 +13,12 @@ pub trait SimpleNotifyWorker {
fn consumer(&self, job: &Self::J, notifier: &mut NotificationReceiver); fn consumer(&self, job: &Self::J, notifier: &mut NotificationReceiver);
fn msg_to_job(&self, method: &Deliver, headers: &BasicProperties, fn msg_to_job(
body: &Vec<u8>) -> Result<Self::J, String>; &self,
method: &Deliver,
headers: &BasicProperties,
body: &Vec<u8>,
) -> Result<Self::J, String>;
} }
pub trait NotificationReceiver { pub trait NotificationReceiver {
@ -22,18 +26,16 @@ pub trait NotificationReceiver {
} }
pub struct DummyNotificationReceiver { pub struct DummyNotificationReceiver {
pub actions: Vec<Action> pub actions: Vec<Action>,
} }
impl DummyNotificationReceiver { impl DummyNotificationReceiver {
pub fn new() -> DummyNotificationReceiver { pub fn new() -> DummyNotificationReceiver {
DummyNotificationReceiver { DummyNotificationReceiver { actions: vec![] }
actions: vec![],
}
} }
} }
impl NotificationReceiver for DummyNotificationReceiver { impl NotificationReceiver for DummyNotificationReceiver {
fn tell(&mut self, action: Action) { fn tell(&mut self, action: Action) {
self.actions.push(action); self.actions.push(action);
} }
@ -46,59 +48,57 @@ pub struct ChannelNotificationReceiver<'a> {
impl<'a> ChannelNotificationReceiver<'a> { impl<'a> ChannelNotificationReceiver<'a> {
fn new(channel: &'a mut Channel, delivery_tag: u64) -> ChannelNotificationReceiver<'a> { fn new(channel: &'a mut Channel, delivery_tag: u64) -> ChannelNotificationReceiver<'a> {
return ChannelNotificationReceiver{ return ChannelNotificationReceiver {
channel: channel, channel: channel,
delivery_tag: delivery_tag, delivery_tag: delivery_tag,
}; };
} }
} }
impl<'a> NotificationReceiver for ChannelNotificationReceiver<'a> { impl<'a> NotificationReceiver for ChannelNotificationReceiver<'a> {
fn tell(&mut self, action: Action) { fn tell(&mut self, action: Action) {
match action { match action {
Action::Ack => { Action::Ack => {
self.channel.basic_ack(self.delivery_tag, false).unwrap(); self.channel.basic_ack(self.delivery_tag, false).unwrap();
} }
Action::NackRequeue => { Action::NackRequeue => {
self.channel.basic_nack(self.delivery_tag, false, true).unwrap(); self.channel
.basic_nack(self.delivery_tag, false, true)
.unwrap();
} }
Action::NackDump => { Action::NackDump => {
self.channel.basic_nack(self.delivery_tag, false, false).unwrap(); self.channel
.basic_nack(self.delivery_tag, false, false)
.unwrap();
} }
Action::Publish(msg) => { Action::Publish(msg) => {
let exch = msg.exchange.clone().unwrap_or("".to_owned()); let exch = msg.exchange.clone().unwrap_or("".to_owned());
let key = msg.routing_key.clone().unwrap_or("".to_owned()); let key = msg.routing_key.clone().unwrap_or("".to_owned());
let props = msg.properties.unwrap_or(BasicProperties{ ..Default::default()}); let props = msg.properties.unwrap_or(
self.channel.basic_publish( BasicProperties { ..Default::default() },
exch, );
key, self.channel
msg.mandatory, .basic_publish(exch, key, msg.mandatory, msg.immediate, props, msg.content)
msg.immediate, .unwrap();
props,
msg.content
).unwrap();
} }
} }
} }
} }
pub fn new<T: SimpleNotifyWorker>(worker: T) -> NotifyWorker<T> { pub fn new<T: SimpleNotifyWorker>(worker: T) -> NotifyWorker<T> {
return NotifyWorker{ return NotifyWorker { internal: worker };
internal: worker,
};
} }
impl <T: SimpleNotifyWorker + Send> Consumer for NotifyWorker<T> { impl<T: SimpleNotifyWorker + Send> Consumer for NotifyWorker<T> {
fn handle_delivery(&mut self, fn handle_delivery(
channel: &mut Channel, &mut self,
method: Deliver, channel: &mut Channel,
headers: BasicProperties, method: Deliver,
body: Vec<u8>) { headers: BasicProperties,
let mut receiver = ChannelNotificationReceiver::new( body: Vec<u8>,
channel, ) {
method.delivery_tag let mut receiver = ChannelNotificationReceiver::new(channel, method.delivery_tag);
);
let job = self.internal.msg_to_job(&method, &headers, &body).unwrap(); let job = self.internal.msg_to_job(&method, &headers, &body).unwrap();
self.internal.consumer(&job, &mut receiver); self.internal.consumer(&job, &mut receiver);

View file

@ -26,7 +26,7 @@ impl OutPathDiff {
} }
fn parse(&self, f: File) -> HashMap<String, String> { fn parse(&self, f: File) -> HashMap<String, String> {
let mut result: HashMap<String,String>; let mut result: HashMap<String, String>;
result = HashMap::new(); result = HashMap::new();
{ {
@ -34,7 +34,7 @@ impl OutPathDiff {
.lines() .lines()
.filter_map(|line| match line { .filter_map(|line| match line {
Ok(line) => Some(line), Ok(line) => Some(line),
Err(_) => None Err(_) => None,
}) })
.map(|x| { .map(|x| {
let split: Vec<&str> = x.split_whitespace().collect(); let split: Vec<&str> = x.split_whitespace().collect();
@ -43,7 +43,8 @@ impl OutPathDiff {
} else { } else {
info!("Warning: not 2 word segments in {:?}", split); info!("Warning: not 2 word segments in {:?}", split);
} }
}).count(); })
.count();
} }
return result; return result;
@ -115,8 +116,8 @@ pub struct OutPaths {
check_meta: bool, check_meta: bool,
} }
impl OutPaths { impl OutPaths {
pub fn new(nix: nix::Nix, path: PathBuf, check_meta: bool) -> OutPaths { pub fn new(nix: nix::Nix, path: PathBuf, check_meta: bool) -> OutPaths {
OutPaths { OutPaths {
nix: nix, nix: nix,
path: path, path: path,
@ -132,12 +133,13 @@ impl OutPaths {
self.place_nix(); self.place_nix();
let ret = self.execute(); let ret = self.execute();
self.remove_nix(); self.remove_nix();
return ret return ret;
} }
fn place_nix(&self) { fn place_nix(&self) {
let mut file = File::create(self.nix_path()).expect("Failed to create nix out path check"); let mut file = File::create(self.nix_path()).expect("Failed to create nix out path check");
file.write_all(include_str!("outpaths.nix").as_bytes()).expect("Failed to place outpaths.nix"); file.write_all(include_str!("outpaths.nix").as_bytes())
.expect("Failed to place outpaths.nix");
} }
fn remove_nix(&self) { fn remove_nix(&self) {
@ -169,9 +171,11 @@ impl OutPaths {
String::from("-qaP"), String::from("-qaP"),
String::from("--no-name"), String::from("--no-name"),
String::from("--out-path"), String::from("--out-path"),
String::from("--arg"), String::from("checkMeta"), check_meta, String::from("--arg"),
String::from("checkMeta"),
check_meta,
], ],
true true,
) )
} }
} }

View file

@ -7,27 +7,27 @@ pub trait SysEvents {
} }
pub struct RabbitMQ { pub struct RabbitMQ {
channel: Channel channel: Channel,
} }
impl RabbitMQ { impl RabbitMQ {
pub fn new(channel: Channel) -> RabbitMQ { pub fn new(channel: Channel) -> RabbitMQ {
RabbitMQ { RabbitMQ { channel: channel }
channel: channel
}
} }
} }
impl SysEvents for RabbitMQ { impl SysEvents for RabbitMQ {
fn tick(&mut self, name: &str) { fn tick(&mut self, name: &str) {
let props = BasicProperties{ ..Default::default()}; let props = BasicProperties { ..Default::default() };
self.channel.basic_publish( self.channel
String::from("stats"), .basic_publish(
"".to_owned(), String::from("stats"),
false, "".to_owned(),
false, false,
props, false,
String::from(name).into_bytes() props,
).unwrap(); String::from(name).into_bytes(),
)
.unwrap();
} }
} }

View file

@ -33,7 +33,10 @@ impl StdenvTagger {
for tag in &self.selected { for tag in &self.selected {
if !self.possible.contains(&tag) { if !self.possible.contains(&tag) {
panic!("Tried to add label {} but it isn't in the possible list!", tag); panic!(
"Tried to add label {} but it isn't in the possible list!",
tag
);
} }
} }
} }
@ -88,23 +91,38 @@ impl RebuildTagger {
for attr in attrs { for attr in attrs {
match attr.rsplit(".").next() { match attr.rsplit(".").next() {
Some("x86_64-darwin") => { counter_darwin += 1; } Some("x86_64-darwin") => {
Some("x86_64-linux") => { counter_linux += 1; } counter_darwin += 1;
Some("aarch64-linux") => { } }
Some("i686-linux") => { } Some("x86_64-linux") => {
Some(arch) => { info!("Unknown arch: {:?}", arch); } counter_linux += 1;
None => { info!("Cannot grok attr: {:?}", attr); } }
Some("aarch64-linux") => {}
Some("i686-linux") => {}
Some(arch) => {
info!("Unknown arch: {:?}", arch);
}
None => {
info!("Cannot grok attr: {:?}", attr);
}
} }
} }
self.selected = vec![ self.selected =
String::from(format!("10.rebuild-linux: {}", self.bucket(counter_linux))), vec![
String::from(format!("10.rebuild-darwin: {}", self.bucket(counter_darwin))), String::from(format!("10.rebuild-linux: {}", self.bucket(counter_linux))),
]; String::from(format!(
"10.rebuild-darwin: {}",
self.bucket(counter_darwin)
)),
];
for tag in &self.selected { for tag in &self.selected {
if !self.possible.contains(&tag) { if !self.possible.contains(&tag) {
panic!("Tried to add label {} but it isn't in the possible list!", tag); panic!(
"Tried to add label {} but it isn't in the possible list!",
tag
);
} }
} }
} }
@ -123,7 +141,7 @@ impl RebuildTagger {
return remove; return remove;
} }
fn bucket(&self, count: u64) -> &str{ fn bucket(&self, count: u64) -> &str {
if count > 500 { if count > 500 {
return "501+"; return "501+";
} else if count > 100 { } else if count > 100 {

View file

@ -16,7 +16,7 @@ use ofborg::commentparser;
use ofborg::worker; use ofborg::worker;
use ofborg::notifyworker; use ofborg::notifyworker;
use amqp::protocol::basic::{Deliver,BasicProperties}; use amqp::protocol::basic::{Deliver, BasicProperties};
pub struct BuildWorker { pub struct BuildWorker {
@ -28,8 +28,14 @@ pub struct BuildWorker {
} }
impl BuildWorker { impl BuildWorker {
pub fn new(cloner: checkout::CachedCloner, nix: nix::Nix, system: String, identity: String, full_logs: bool) -> BuildWorker { pub fn new(
return BuildWorker{ cloner: checkout::CachedCloner,
nix: nix::Nix,
system: String,
identity: String,
full_logs: bool,
) -> BuildWorker {
return BuildWorker {
cloner: cloner, cloner: cloner,
nix: nix, nix: nix,
system: system, system: system,
@ -38,7 +44,11 @@ impl BuildWorker {
}; };
} }
fn actions<'a, 'b>(&self, job: &'b buildjob::BuildJob, receiver: &'a mut notifyworker::NotificationReceiver) -> JobActions<'a, 'b> { fn actions<'a, 'b>(
&self,
job: &'b buildjob::BuildJob,
receiver: &'a mut notifyworker::NotificationReceiver,
) -> JobActions<'a, 'b> {
JobActions::new(&self.system, &self.identity, job, receiver) JobActions::new(&self.system, &self.identity, job, receiver)
} }
} }
@ -55,8 +65,16 @@ struct JobActions<'a, 'b> {
} }
impl<'a, 'b> JobActions<'a, 'b> { impl<'a, 'b> JobActions<'a, 'b> {
fn new(system: &str, identity: &str, job: &'b buildjob::BuildJob, receiver: &'a mut notifyworker::NotificationReceiver) -> JobActions<'a, 'b> { fn new(
let (log_exchange, log_routing_key) = job.logs.clone().unwrap_or((String::from("logs"), String::from("build.log"))); system: &str,
identity: &str,
job: &'b buildjob::BuildJob,
receiver: &'a mut notifyworker::NotificationReceiver,
) -> JobActions<'a, 'b> {
let (log_exchange, log_routing_key) = job.logs.clone().unwrap_or((
String::from("logs"),
String::from("build.log"),
));
return JobActions { return JobActions {
system: system.to_owned(), system: system.to_owned(),
@ -85,13 +103,13 @@ impl<'a, 'b> JobActions<'a, 'b> {
system: self.system.clone(), system: self.system.clone(),
output: vec![String::from("Merge failed")], output: vec![String::from("Merge failed")],
success: false success: false,
}; };
self.tell(worker::publish_serde_action( self.tell(worker::publish_serde_action(
Some("build-results".to_owned()), Some("build-results".to_owned()),
None, None,
&msg &msg,
)); ));
self.tell(worker::Action::Ack); self.tell(worker::Action::Ack);
} }
@ -111,7 +129,7 @@ impl<'a, 'b> JobActions<'a, 'b> {
self.tell(worker::publish_serde_action( self.tell(worker::publish_serde_action(
log_exchange, log_exchange,
log_routing_key, log_routing_key,
&msg &msg,
)); ));
} }
@ -132,7 +150,7 @@ impl<'a, 'b> JobActions<'a, 'b> {
self.tell(worker::publish_serde_action( self.tell(worker::publish_serde_action(
log_exchange, log_exchange,
log_routing_key, log_routing_key,
&msg &msg,
)); ));
} }
@ -142,13 +160,13 @@ impl<'a, 'b> JobActions<'a, 'b> {
pr: self.job.pr.clone(), pr: self.job.pr.clone(),
system: self.system.clone(), system: self.system.clone(),
output: lines, output: lines,
success: success success: success,
}; };
self.tell(worker::publish_serde_action( self.tell(worker::publish_serde_action(
Some("build-results".to_owned()), Some("build-results".to_owned()),
None, None,
&msg &msg,
)); ));
self.tell(worker::Action::Ack); self.tell(worker::Action::Ack);
} }
@ -161,34 +179,46 @@ impl<'a, 'b> JobActions<'a, 'b> {
impl notifyworker::SimpleNotifyWorker for BuildWorker { impl notifyworker::SimpleNotifyWorker for BuildWorker {
type J = buildjob::BuildJob; type J = buildjob::BuildJob;
fn msg_to_job(&self, _: &Deliver, _: &BasicProperties, fn msg_to_job(
body: &Vec<u8>) -> Result<Self::J, String> { &self,
_: &Deliver,
_: &BasicProperties,
body: &Vec<u8>,
) -> Result<Self::J, String> {
println!("lmao I got a job?"); println!("lmao I got a job?");
return match buildjob::from(body) { return match buildjob::from(body) {
Ok(e) => { Ok(e) } Ok(e) => Ok(e),
Err(e) => { Err(e) => {
println!("{:?}", String::from_utf8(body.clone())); println!("{:?}", String::from_utf8(body.clone()));
panic!("{:?}", e); panic!("{:?}", e);
} }
} };
} }
fn consumer(&self, job: &buildjob::BuildJob, notifier: &mut notifyworker::NotificationReceiver) { fn consumer(
&self,
job: &buildjob::BuildJob,
notifier: &mut notifyworker::NotificationReceiver,
) {
let mut actions = self.actions(&job, notifier); let mut actions = self.actions(&job, notifier);
info!("Working on {}", job.pr.number); info!("Working on {}", job.pr.number);
let project = self.cloner.project(job.repo.full_name.clone(), job.repo.clone_url.clone()); let project = self.cloner.project(
let co = project.clone_for("builder".to_string(), job.repo.full_name.clone(),
self.identity.clone()).unwrap(); job.repo.clone_url.clone(),
);
let co = project
.clone_for("builder".to_string(), self.identity.clone())
.unwrap();
let target_branch = match job.pr.target_branch.clone() { let target_branch = match job.pr.target_branch.clone() {
Some(x) => { x } Some(x) => x,
None => { String::from("origin/master") } None => String::from("origin/master"),
}; };
let buildfile = match job.subset { let buildfile = match job.subset {
Some(commentparser::Subset::NixOS) => "./nixos/release.nix", Some(commentparser::Subset::NixOS) => "./nixos/release.nix",
_ => "./default.nix" _ => "./default.nix",
}; };
// Note: Don't change the system limiter until the system isn't // Note: Don't change the system limiter until the system isn't
@ -221,7 +251,7 @@ impl notifyworker::SimpleNotifyWorker for BuildWorker {
let cmd = self.nix.safely_build_attrs_cmd( let cmd = self.nix.safely_build_attrs_cmd(
refpath.as_ref(), refpath.as_ref(),
buildfile, buildfile,
job.attrs.clone() job.attrs.clone(),
); );
actions.log_started(); actions.log_started();
@ -254,10 +284,7 @@ impl notifyworker::SimpleNotifyWorker for BuildWorker {
let last10lines: Vec<String> = snippet_log.into_iter().collect::<Vec<String>>(); let last10lines: Vec<String> = snippet_log.into_iter().collect::<Vec<String>>();
actions.build_finished( actions.build_finished(success, last10lines.clone());
success,
last10lines.clone()
);
} }
} }
@ -265,17 +292,17 @@ impl notifyworker::SimpleNotifyWorker for BuildWorker {
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use super::*; use super::*;
use std::path::{Path,PathBuf}; use std::path::{Path, PathBuf};
use ofborg::message::{Pr,Repo}; use ofborg::message::{Pr, Repo};
use notifyworker::SimpleNotifyWorker; use notifyworker::SimpleNotifyWorker;
use std::process::{Command, Stdio}; use std::process::{Command, Stdio};
use std::vec::IntoIter; use std::vec::IntoIter;
fn nix() -> nix::Nix { fn nix() -> nix::Nix {
nix::Nix::new("x86_64-linux".to_owned(), "daemon".to_owned(), 1800) nix::Nix::new("x86_64-linux".to_owned(), "daemon".to_owned(), 1800)
} }
fn tpath(component: &str)-> PathBuf { fn tpath(component: &str) -> PathBuf {
return Path::new(env!("CARGO_MANIFEST_DIR")).join(component); return Path::new(env!("CARGO_MANIFEST_DIR")).join(component);
} }
@ -294,7 +321,6 @@ mod tests {
fn make_worker() -> BuildWorker { fn make_worker() -> BuildWorker {
cleanup_scratch(); cleanup_scratch();
// pub fn new(cloner: checkout::CachedCloner, nix: nix::Nix, system: String, identity: String) -> BuildWorker {
let cloner = checkout::cached_cloner(&scratch_dir()); let cloner = checkout::cached_cloner(&scratch_dir());
let nix = nix(); let nix = nix();
let worker = BuildWorker::new( let worker = BuildWorker::new(
@ -302,13 +328,13 @@ mod tests {
nix, nix,
"x86_64-linux".to_owned(), "x86_64-linux".to_owned(),
"cargo-test-build".to_owned(), "cargo-test-build".to_owned(),
true true,
); );
return worker; return worker;
} }
fn make_pr_repo() -> String{ fn make_pr_repo() -> String {
let output = Command::new("./make-pr.sh") let output = Command::new("./make-pr.sh")
.current_dir(tpath("./test-srcs")) .current_dir(tpath("./test-srcs"))
.stderr(Stdio::null()) .stderr(Stdio::null())
@ -321,41 +347,41 @@ mod tests {
fn assert_contains_job(actions: &mut IntoIter<worker::Action>, text_to_match: &str) { fn assert_contains_job(actions: &mut IntoIter<worker::Action>, text_to_match: &str) {
println!("\n\nSearching for {:?}", text_to_match); println!("\n\nSearching for {:?}", text_to_match);
actions.position(|job| actions
match job { .position(|job| match job {
worker::Action::Publish(ref body) => { worker::Action::Publish(ref body) => {
let mystr = String::from_utf8(body.content.clone()).unwrap(); let mystr = String::from_utf8(body.content.clone()).unwrap();
if mystr.contains(text_to_match) { if mystr.contains(text_to_match) {
println!(" Matched: {:?}", mystr); println!(" Matched: {:?}", mystr);
return true; return true;
} else { } else {
println!(" miss: {:?}", mystr); println!(" miss: {:?}", mystr);
return false; return false;
} }
} }
e => { e => {
println!(" notPublish: {:?}", e); println!(" notPublish: {:?}", e);
return false; return false;
} }
} })
).expect( .expect(&format!(
&format!("Actions should contain a job matching {:?}, after the previous check", "Actions should contain a job matching {:?}, after the previous check",
text_to_match) text_to_match
); ));
} }
#[test] #[test]
pub fn test_simple_build() { pub fn test_simple_build() {
let worker = make_worker(); let worker = make_worker();
let job = buildjob::BuildJob{ let job = buildjob::BuildJob {
attrs: vec!["success".to_owned()], attrs: vec!["success".to_owned()],
pr: Pr{ pr: Pr {
head_sha: make_pr_repo(), head_sha: make_pr_repo(),
number: 1, number: 1,
target_branch: Some("master".to_owned()), target_branch: Some("master".to_owned()),
}, },
repo: Repo{ repo: Repo {
clone_url: tpath("./test-srcs/bare-repo").to_str().unwrap().to_owned(), clone_url: tpath("./test-srcs/bare-repo").to_str().unwrap().to_owned(),
full_name: "test-git".to_owned(), full_name: "test-git".to_owned(),
name: "nixos".to_owned(), name: "nixos".to_owned(),

View file

@ -9,17 +9,17 @@ use hubcaps;
use ofborg::message::{Repo, Pr, buildjob, massrebuildjob}; use ofborg::message::{Repo, Pr, buildjob, massrebuildjob};
use ofborg::worker; use ofborg::worker;
use ofborg::commentparser; use ofborg::commentparser;
use amqp::protocol::basic::{Deliver,BasicProperties}; use amqp::protocol::basic::{Deliver, BasicProperties};
pub struct GitHubCommentWorker { pub struct GitHubCommentWorker {
acl: acl::ACL, acl: acl::ACL,
github: hubcaps::Github github: hubcaps::Github,
} }
impl GitHubCommentWorker { impl GitHubCommentWorker {
pub fn new(acl: acl::ACL, github: hubcaps::Github) -> GitHubCommentWorker { pub fn new(acl: acl::ACL, github: hubcaps::Github) -> GitHubCommentWorker {
return GitHubCommentWorker{ return GitHubCommentWorker {
acl: acl, acl: acl,
github: github, github: github,
}; };
@ -29,33 +29,42 @@ impl GitHubCommentWorker {
impl worker::SimpleWorker for GitHubCommentWorker { impl worker::SimpleWorker for GitHubCommentWorker {
type J = ghevent::IssueComment; type J = ghevent::IssueComment;
fn msg_to_job(&mut self, _: &Deliver, _: &BasicProperties, fn msg_to_job(
body: &Vec<u8>) -> Result<Self::J, String> { &mut self,
_: &Deliver,
_: &BasicProperties,
body: &Vec<u8>,
) -> Result<Self::J, String> {
return match serde_json::from_slice(body) { return match serde_json::from_slice(body) {
Ok(e) => { Ok(e) } Ok(e) => Ok(e),
Err(e) => { Err(e) => {
println!("Failed to deserialize IsssueComment: {:?}", String::from_utf8(body.clone())); println!(
"Failed to deserialize IsssueComment: {:?}",
String::from_utf8(body.clone())
);
panic!("{:?}", e); panic!("{:?}", e);
} }
} };
} }
fn consumer(&mut self, job: &ghevent::IssueComment) -> worker::Actions { fn consumer(&mut self, job: &ghevent::IssueComment) -> worker::Actions {
let instructions = commentparser::parse(&job.comment.body); let instructions = commentparser::parse(&job.comment.body);
if instructions == None { if instructions == None {
return vec![ return vec![worker::Action::Ack];
worker::Action::Ack
];
} }
if !self.acl.can_build(&job.comment.user.login, &job.repository.full_name) { if !self.acl.can_build(
println!("ACL prohibits {} from building {:?} for {}", &job.comment.user.login,
job.comment.user.login, &job.repository.full_name,
instructions, )
job.repository.full_name); {
return vec![ println!(
worker::Action::Ack "ACL prohibits {} from building {:?} for {}",
]; job.comment.user.login,
instructions,
job.repository.full_name
);
return vec![worker::Action::Ack];
} }
println!("Got job: {:?}", job); println!("Got job: {:?}", job);
@ -64,20 +73,22 @@ impl worker::SimpleWorker for GitHubCommentWorker {
println!("Instructions: {:?}", instructions); println!("Instructions: {:?}", instructions);
let pr = self.github let pr = self.github
.repo(job.repository.owner.login.clone(), job.repository.name.clone()) .repo(
job.repository.owner.login.clone(),
job.repository.name.clone(),
)
.pulls() .pulls()
.get(job.issue.number) .get(job.issue.number)
.get(); .get();
if let Err(x) = pr { if let Err(x) = pr {
info!("fetching PR {}#{} from GitHub yielded error {}", info!(
job.repository.full_name, "fetching PR {}#{} from GitHub yielded error {}",
job.issue.number, job.repository.full_name,
x job.issue.number,
x
); );
return vec![ return vec![worker::Action::Ack];
worker::Action::Ack
];
} }
let pr = pr.unwrap(); let pr = pr.unwrap();
@ -92,7 +103,7 @@ impl worker::SimpleWorker for GitHubCommentWorker {
let pr_msg = Pr { let pr_msg = Pr {
number: job.issue.number.clone(), number: job.issue.number.clone(),
head_sha: pr.head.sha.clone(), head_sha: pr.head.sha.clone(),
target_branch: Some(pr.base.commit_ref.clone()) target_branch: Some(pr.base.commit_ref.clone()),
}; };
let mut response: Vec<worker::Action> = vec![]; let mut response: Vec<worker::Action> = vec![];
@ -100,22 +111,22 @@ impl worker::SimpleWorker for GitHubCommentWorker {
for instruction in instructions { for instruction in instructions {
match instruction { match instruction {
commentparser::Instruction::Build(subset, attrs) => { commentparser::Instruction::Build(subset, attrs) => {
let msg = buildjob::BuildJob{ let msg = buildjob::BuildJob {
repo: repo_msg.clone(), repo: repo_msg.clone(),
pr: pr_msg.clone(), pr: pr_msg.clone(),
subset: Some(subset), subset: Some(subset),
attrs: attrs, attrs: attrs,
logs: Some(("logs".to_owned(), "build.log".to_owned())) logs: Some(("logs".to_owned(), "build.log".to_owned())),
}; };
response.push(worker::publish_serde_action( response.push(worker::publish_serde_action(
Some("build-jobs".to_owned()), Some("build-jobs".to_owned()),
None, None,
&msg &msg,
)); ));
} }
commentparser::Instruction::Eval => { commentparser::Instruction::Eval => {
let msg = massrebuildjob::MassRebuildJob{ let msg = massrebuildjob::MassRebuildJob {
repo: repo_msg.clone(), repo: repo_msg.clone(),
pr: pr_msg.clone(), pr: pr_msg.clone(),
}; };
@ -123,7 +134,7 @@ impl worker::SimpleWorker for GitHubCommentWorker {
response.push(worker::publish_serde_action( response.push(worker::publish_serde_action(
None, None,
Some("mass-rebuild-check-jobs".to_owned()), Some("mass-rebuild-check-jobs".to_owned()),
&msg &msg,
)); ));
} }

View file

@ -6,94 +6,96 @@ use ofborg::worker;
use ofborg::message::plasticheartbeat; use ofborg::message::plasticheartbeat;
use amqp::Channel; use amqp::Channel;
use amqp::Table; use amqp::Table;
use amqp::protocol::basic::{Deliver,BasicProperties}; use amqp::protocol::basic::{Deliver, BasicProperties};
use std::process; use std::process;
use amqp::Basic; use amqp::Basic;
struct PlasticHeartbeatWorker { struct PlasticHeartbeatWorker {
queue_name: String queue_name: String,
} }
impl PlasticHeartbeatWorker { impl PlasticHeartbeatWorker {
fn message(&self) -> worker::QueueMsg { fn message(&self) -> worker::QueueMsg {
return worker::QueueMsg{ return worker::QueueMsg {
exchange: None, exchange: None,
routing_key: Some(self.queue_name.clone()), routing_key: Some(self.queue_name.clone()),
mandatory: true, mandatory: true,
immediate: false, immediate: false,
properties: None, properties: None,
content: serde_json::to_string(&plasticheartbeat::PlasticHeartbeat{}).unwrap().into_bytes() content: serde_json::to_string(&plasticheartbeat::PlasticHeartbeat {})
.unwrap()
.into_bytes(),
}; };
} }
} }
impl worker::SimpleWorker for PlasticHeartbeatWorker { impl worker::SimpleWorker for PlasticHeartbeatWorker {
type J = plasticheartbeat::PlasticHeartbeat; type J = plasticheartbeat::PlasticHeartbeat;
fn msg_to_job(&mut self, _: &Deliver, _: &BasicProperties, fn msg_to_job(
body: &Vec<u8>) -> Result<Self::J, String> { &mut self,
_: &Deliver,
_: &BasicProperties,
body: &Vec<u8>,
) -> Result<Self::J, String> {
return match plasticheartbeat::from(body) { return match plasticheartbeat::from(body) {
Ok(e) => { Ok(e) } Ok(e) => Ok(e),
Err(e) => { Err(e) => {
println!("{:?}", String::from_utf8(body.clone())); println!("{:?}", String::from_utf8(body.clone()));
panic!("{:?}", e); panic!("{:?}", e);
} }
} };
} }
fn consumer(&mut self, _job: &plasticheartbeat::PlasticHeartbeat) -> worker::Actions { fn consumer(&mut self, _job: &plasticheartbeat::PlasticHeartbeat) -> worker::Actions {
thread::sleep(time::Duration::from_secs(5)); thread::sleep(time::Duration::from_secs(5));
return vec![ return vec![worker::Action::Publish(self.message()), worker::Action::Ack];
worker::Action::Publish(self.message()),
worker::Action::Ack
];
} }
} }
pub fn start_on_channel(mut hbchan: Channel, consumer_name: String) { pub fn start_on_channel(mut hbchan: Channel, consumer_name: String) {
let queue_name = hbchan.queue_declare( let queue_name = hbchan
"", .queue_declare(
false, // passive "",
false, // durable false, // passive
true, // exclusive false, // durable
true, // auto_delete true, // exclusive
false, //nowait true, // auto_delete
Table::new() false, //nowait
) Table::new(),
)
.expect("Failed to declare an anon queue for PlasticHeartbeats!") .expect("Failed to declare an anon queue for PlasticHeartbeats!")
.queue; .queue;
println!("Got personal queue: {:?}", queue_name); println!("Got personal queue: {:?}", queue_name);
hbchan.basic_publish( hbchan
"", .basic_publish(
queue_name.as_ref(), "",
true, // mandatory queue_name.as_ref(),
false, // immediate true, // mandatory
BasicProperties { false, // immediate
..Default::default() BasicProperties { ..Default::default() },
}, serde_json::to_string(&plasticheartbeat::PlasticHeartbeat {})
serde_json::to_string(&plasticheartbeat::PlasticHeartbeat{}).unwrap().into_bytes() .unwrap()
).unwrap(); .into_bytes(),
)
.unwrap();
let worker = move || let worker = move || {
{ hbchan
hbchan.basic_consume( .basic_consume(
worker::new( worker::new(PlasticHeartbeatWorker { queue_name: (&queue_name).clone() }),
PlasticHeartbeatWorker{ queue_name,
queue_name: (&queue_name).clone() String::from(format!("{}-heartbeat", consumer_name)),
} false,
), false,
queue_name, false,
String::from(format!("{}-heartbeat", consumer_name)), false,
false, Table::new(),
false, )
false, .unwrap();
false,
Table::new()
).unwrap();
hbchan.start_consuming(); hbchan.start_consuming();
println!("PlasticHeartbeat failed"); println!("PlasticHeartbeat failed");

View file

@ -14,11 +14,11 @@ use ofborg::nix::Nix;
use ofborg::stats; use ofborg::stats;
use ofborg::worker; use ofborg::worker;
use ofborg::tagger::{StdenvTagger,RebuildTagger}; use ofborg::tagger::{StdenvTagger, RebuildTagger};
use ofborg::outpathdiff::{OutPaths, OutPathDiff}; use ofborg::outpathdiff::{OutPaths, OutPathDiff};
use ofborg::evalchecker::EvalChecker; use ofborg::evalchecker::EvalChecker;
use ofborg::commitstatus::CommitStatus; use ofborg::commitstatus::CommitStatus;
use amqp::protocol::basic::{Deliver,BasicProperties}; use amqp::protocol::basic::{Deliver, BasicProperties};
use hubcaps; use hubcaps;
pub struct MassRebuildWorker<E> { pub struct MassRebuildWorker<E> {
@ -30,8 +30,14 @@ pub struct MassRebuildWorker<E> {
} }
impl<E: stats::SysEvents> MassRebuildWorker<E> { impl<E: stats::SysEvents> MassRebuildWorker<E> {
pub fn new(cloner: checkout::CachedCloner, nix: Nix, github: hubcaps::Github, identity: String, events: E) -> MassRebuildWorker<E> { pub fn new(
return MassRebuildWorker{ cloner: checkout::CachedCloner,
nix: Nix,
github: hubcaps::Github,
identity: String,
events: E,
) -> MassRebuildWorker<E> {
return MassRebuildWorker {
cloner: cloner, cloner: cloner,
nix: nix, nix: nix,
github: github, github: github,
@ -41,16 +47,19 @@ impl<E: stats::SysEvents> MassRebuildWorker<E> {
} }
fn actions(&self) -> massrebuildjob::Actions { fn actions(&self) -> massrebuildjob::Actions {
return massrebuildjob::Actions{ return massrebuildjob::Actions {};
};
} }
} }
impl<E: stats::SysEvents> worker::SimpleWorker for MassRebuildWorker<E> { impl<E: stats::SysEvents> worker::SimpleWorker for MassRebuildWorker<E> {
type J = massrebuildjob::MassRebuildJob; type J = massrebuildjob::MassRebuildJob;
fn msg_to_job(&mut self, _: &Deliver, _: &BasicProperties, fn msg_to_job(
body: &Vec<u8>) -> Result<Self::J, String> { &mut self,
_: &Deliver,
_: &BasicProperties,
body: &Vec<u8>,
) -> Result<Self::J, String> {
self.events.tick("job-received"); self.events.tick("job-received");
return match massrebuildjob::from(body) { return match massrebuildjob::from(body) {
Ok(e) => { Ok(e) => {
@ -59,15 +68,20 @@ impl<E: stats::SysEvents> worker::SimpleWorker for MassRebuildWorker<E> {
} }
Err(e) => { Err(e) => {
self.events.tick("job-decode-failure"); self.events.tick("job-decode-failure");
println!("{:?}", String::from_utf8(body.clone())); println!(
panic!("{:?}", e); "Failed to decode message: {:?}",
String::from_utf8(body.clone())
);
Err("Failed to decode message".to_owned())
} }
} };
} }
fn consumer(&mut self, job: &massrebuildjob::MassRebuildJob) -> worker::Actions { fn consumer(&mut self, job: &massrebuildjob::MassRebuildJob) -> worker::Actions {
let repo = self.github let repo = self.github.repo(
.repo(job.repo.owner.clone(), job.repo.name.clone()); job.repo.owner.clone(),
job.repo.name.clone(),
);
let gists = self.github.gists(); let gists = self.github.gists();
let issue = repo.issue(job.pr.number); let issue = repo.issue(job.pr.number);
@ -92,48 +106,49 @@ impl<E: stats::SysEvents> worker::SimpleWorker for MassRebuildWorker<E> {
job.pr.head_sha.clone(), job.pr.head_sha.clone(),
"grahamcofborg-eval".to_owned(), "grahamcofborg-eval".to_owned(),
"Starting".to_owned(), "Starting".to_owned(),
None None,
); );
overall_status.set_with_description("Starting", hubcaps::statuses::State::Pending); overall_status.set_with_description("Starting", hubcaps::statuses::State::Pending);
let project = self.cloner.project(job.repo.full_name.clone(), job.repo.clone_url.clone()); let project = self.cloner.project(
job.repo.full_name.clone(),
job.repo.clone_url.clone(),
);
overall_status.set_with_description("Cloning project", hubcaps::statuses::State::Pending); overall_status.set_with_description("Cloning project", hubcaps::statuses::State::Pending);
info!("Working on {}", job.pr.number); info!("Working on {}", job.pr.number);
let co = project.clone_for("mr-est".to_string(), let co = project
self.identity.clone()).unwrap(); .clone_for("mr-est".to_string(), self.identity.clone())
.unwrap();
let target_branch = match job.pr.target_branch.clone() { let target_branch = match job.pr.target_branch.clone() {
Some(x) => { x } Some(x) => x,
None => { String::from("master") } None => String::from("master"),
}; };
overall_status.set_with_description( overall_status.set_with_description(
format!("Checking out {}", &target_branch).as_ref(), format!("Checking out {}", &target_branch).as_ref(),
hubcaps::statuses::State::Pending hubcaps::statuses::State::Pending,
); );
info!("Checking out target branch {}", &target_branch); info!("Checking out target branch {}", &target_branch);
let refpath = co.checkout_origin_ref(target_branch.as_ref()).unwrap(); let refpath = co.checkout_origin_ref(target_branch.as_ref()).unwrap();
overall_status.set_with_description( overall_status.set_with_description(
"Checking original stdenvs", "Checking original stdenvs",
hubcaps::statuses::State::Pending hubcaps::statuses::State::Pending,
); );
let mut stdenvs = Stdenvs::new(self.nix.clone(), PathBuf::from(&refpath)); let mut stdenvs = Stdenvs::new(self.nix.clone(), PathBuf::from(&refpath));
stdenvs.identify_before(); stdenvs.identify_before();
let mut rebuildsniff = OutPathDiff::new( let mut rebuildsniff = OutPathDiff::new(self.nix.clone(), PathBuf::from(&refpath));
self.nix.clone(),
PathBuf::from(&refpath)
);
overall_status.set_with_description( overall_status.set_with_description(
"Checking original out paths", "Checking original out paths",
hubcaps::statuses::State::Pending hubcaps::statuses::State::Pending,
); );
if let Err(mut output) = rebuildsniff.find_before() { if let Err(mut output) = rebuildsniff.find_before() {
@ -146,38 +161,32 @@ impl<E: stats::SysEvents> worker::SimpleWorker for MassRebuildWorker<E> {
overall_status.set_with_description( overall_status.set_with_description(
format!("Target branch {} doesn't evaluate!", &target_branch).as_ref(), format!("Target branch {} doesn't evaluate!", &target_branch).as_ref(),
hubcaps::statuses::State::Failure hubcaps::statuses::State::Failure,
); );
return self.actions().skip(&job); return self.actions().skip(&job);
} }
overall_status.set_with_description( overall_status.set_with_description("Fetching PR", hubcaps::statuses::State::Pending);
"Fetching PR",
hubcaps::statuses::State::Pending
);
co.fetch_pr(job.pr.number).unwrap(); co.fetch_pr(job.pr.number).unwrap();
if !co.commit_exists(job.pr.head_sha.as_ref()) { if !co.commit_exists(job.pr.head_sha.as_ref()) {
overall_status.set_with_description( overall_status.set_with_description(
"Commit not found", "Commit not found",
hubcaps::statuses::State::Error hubcaps::statuses::State::Error,
); );
info!("Commit {} doesn't exist", job.pr.head_sha); info!("Commit {} doesn't exist", job.pr.head_sha);
return self.actions().skip(&job); return self.actions().skip(&job);
} }
overall_status.set_with_description( overall_status.set_with_description("Merging PR", hubcaps::statuses::State::Pending);
"Merging PR",
hubcaps::statuses::State::Pending
);
if let Err(_) = co.merge_commit(job.pr.head_sha.as_ref()) { if let Err(_) = co.merge_commit(job.pr.head_sha.as_ref()) {
overall_status.set_with_description( overall_status.set_with_description(
"Failed to merge", "Failed to merge",
hubcaps::statuses::State::Failure hubcaps::statuses::State::Failure,
); );
info!("Failed to merge {}", job.pr.head_sha); info!("Failed to merge {}", job.pr.head_sha);
@ -186,14 +195,14 @@ impl<E: stats::SysEvents> worker::SimpleWorker for MassRebuildWorker<E> {
overall_status.set_with_description( overall_status.set_with_description(
"Checking new stdenvs", "Checking new stdenvs",
hubcaps::statuses::State::Pending hubcaps::statuses::State::Pending,
); );
stdenvs.identify_after(); stdenvs.identify_after();
overall_status.set_with_description( overall_status.set_with_description(
"Checking new out paths", "Checking new out paths",
hubcaps::statuses::State::Pending hubcaps::statuses::State::Pending,
); );
if let Err(mut output) = rebuildsniff.find_after() { if let Err(mut output) = rebuildsniff.find_after() {
@ -204,8 +213,11 @@ impl<E: stats::SysEvents> worker::SimpleWorker for MassRebuildWorker<E> {
file_to_str(&mut output), file_to_str(&mut output),
)); ));
overall_status.set_with_description( overall_status.set_with_description(
format!("Failed to enumerate outputs after merging to {}", &target_branch).as_ref(), format!(
hubcaps::statuses::State::Failure "Failed to enumerate outputs after merging to {}",
&target_branch
).as_ref(),
hubcaps::statuses::State::Failure,
); );
return self.actions().skip(&job); return self.actions().skip(&job);
} }
@ -213,114 +225,119 @@ impl<E: stats::SysEvents> worker::SimpleWorker for MassRebuildWorker<E> {
println!("Got path: {:?}, building", refpath); println!("Got path: {:?}, building", refpath);
overall_status.set_with_description( overall_status.set_with_description(
"Beginning Evaluations", "Beginning Evaluations",
hubcaps::statuses::State::Pending hubcaps::statuses::State::Pending,
); );
let eval_checks = vec![ let eval_checks = vec![
EvalChecker::new("package-list", EvalChecker::new(
"nix-env", "package-list",
vec![ "nix-env",
String::from("--file"), vec![
String::from("."), String::from("--file"),
String::from("--query"), String::from("."),
String::from("--available"), String::from("--query"),
String::from("--json"), String::from("--available"),
], String::from("--json"),
self.nix.clone() ],
self.nix.clone()
), ),
EvalChecker::new("nixos-options", EvalChecker::new(
"nix-instantiate", "nixos-options",
vec![ "nix-instantiate",
String::from("./nixos/release.nix"), vec![
String::from("-A"), String::from("./nixos/release.nix"),
String::from("options"), String::from("-A"),
], String::from("options"),
self.nix.clone() ],
self.nix.clone()
), ),
EvalChecker::new("nixos-manual", EvalChecker::new(
"nix-instantiate", "nixos-manual",
vec![ "nix-instantiate",
String::from("./nixos/release.nix"), vec![
String::from("-A"), String::from("./nixos/release.nix"),
String::from("manual"), String::from("-A"),
], String::from("manual"),
self.nix.clone() ],
self.nix.clone()
), ),
EvalChecker::new("nixpkgs-manual", EvalChecker::new(
"nix-instantiate", "nixpkgs-manual",
vec![ "nix-instantiate",
String::from("./pkgs/top-level/release.nix"), vec![
String::from("-A"), String::from("./pkgs/top-level/release.nix"),
String::from("manual"), String::from("-A"),
], String::from("manual"),
self.nix.clone() ],
self.nix.clone()
), ),
EvalChecker::new("nixpkgs-tarball", EvalChecker::new(
"nix-instantiate", "nixpkgs-tarball",
vec![ "nix-instantiate",
String::from("./pkgs/top-level/release.nix"), vec![
String::from("-A"), String::from("./pkgs/top-level/release.nix"),
String::from("tarball"), String::from("-A"),
], String::from("tarball"),
self.nix.clone() ],
self.nix.clone()
), ),
EvalChecker::new("nixpkgs-unstable-jobset", EvalChecker::new(
"nix-instantiate", "nixpkgs-unstable-jobset",
vec![ "nix-instantiate",
String::from("./pkgs/top-level/release.nix"), vec![
String::from("-A"), String::from("./pkgs/top-level/release.nix"),
String::from("unstable"), String::from("-A"),
], String::from("unstable"),
self.nix.clone() ],
self.nix.clone()
), ),
]; ];
let mut eval_results: bool = eval_checks.into_iter() let mut eval_results: bool = eval_checks
.map(|check| .into_iter()
{ .map(|check| {
let mut status = CommitStatus::new( let mut status = CommitStatus::new(
repo.statuses(), repo.statuses(),
job.pr.head_sha.clone(), job.pr.head_sha.clone(),
check.name(), check.name(),
check.cli_cmd(), check.cli_cmd(),
None None,
); );
status.set(hubcaps::statuses::State::Pending); status.set(hubcaps::statuses::State::Pending);
let state: hubcaps::statuses::State; let state: hubcaps::statuses::State;
let gist_url: Option<String>; let gist_url: Option<String>;
match check.execute(Path::new(&refpath)) { match check.execute(Path::new(&refpath)) {
Ok(_) => { Ok(_) => {
state = hubcaps::statuses::State::Success; state = hubcaps::statuses::State::Success;
gist_url = None; gist_url = None;
} }
Err(mut out) => { Err(mut out) => {
state = hubcaps::statuses::State::Failure; state = hubcaps::statuses::State::Failure;
gist_url = make_gist( gist_url = make_gist(
&gists, &gists,
check.name(), check.name(),
Some(format!("{:?}", state)), Some(format!("{:?}", state)),
file_to_str(&mut out), file_to_str(&mut out),
); );
} }
} }
status.set_url(gist_url); status.set_url(gist_url);
status.set(state.clone()); status.set(state.clone());
if state == hubcaps::statuses::State::Success { if state == hubcaps::statuses::State::Success {
return Ok(()) return Ok(());
} else { } else {
return Err(()) return Err(());
} }
} })
)
.all(|status| status == Ok(())); .all(|status| status == Ok(()));
if eval_results { if eval_results {
@ -329,7 +346,7 @@ impl<E: stats::SysEvents> worker::SimpleWorker for MassRebuildWorker<E> {
job.pr.head_sha.clone(), job.pr.head_sha.clone(),
String::from("grahamcofborg-eval-check-meta"), String::from("grahamcofborg-eval-check-meta"),
String::from("config.nix: checkMeta = true"), String::from("config.nix: checkMeta = true"),
None None,
); );
status.set(hubcaps::statuses::State::Pending); status.set(hubcaps::statuses::State::Pending);
@ -337,11 +354,7 @@ impl<E: stats::SysEvents> worker::SimpleWorker for MassRebuildWorker<E> {
let state: hubcaps::statuses::State; let state: hubcaps::statuses::State;
let gist_url: Option<String>; let gist_url: Option<String>;
let checker = OutPaths::new( let checker = OutPaths::new(self.nix.clone(), PathBuf::from(&refpath), true);
self.nix.clone(),
PathBuf::from(&refpath),
true
);
match checker.find() { match checker.find() {
Ok(_) => { Ok(_) => {
state = hubcaps::statuses::State::Success; state = hubcaps::statuses::State::Success;
@ -366,32 +379,35 @@ impl<E: stats::SysEvents> worker::SimpleWorker for MassRebuildWorker<E> {
if eval_results { if eval_results {
overall_status.set_with_description( overall_status.set_with_description(
"Calculating Changed Outputs", "Calculating Changed Outputs",
hubcaps::statuses::State::Pending hubcaps::statuses::State::Pending,
); );
let mut stdenvtagger = StdenvTagger::new(); let mut stdenvtagger = StdenvTagger::new();
if !stdenvs.are_same() { if !stdenvs.are_same() {
stdenvtagger.changed(stdenvs.changed()); stdenvtagger.changed(stdenvs.changed());
} }
update_labels(&issue, stdenvtagger.tags_to_add(), update_labels(
stdenvtagger.tags_to_remove()); &issue,
stdenvtagger.tags_to_add(),
stdenvtagger.tags_to_remove(),
);
let mut rebuild_tags = RebuildTagger::new(); let mut rebuild_tags = RebuildTagger::new();
if let Some(attrs) = rebuildsniff.calculate_rebuild() { if let Some(attrs) = rebuildsniff.calculate_rebuild() {
rebuild_tags.parse_attrs(attrs); rebuild_tags.parse_attrs(attrs);
} }
update_labels(&issue, rebuild_tags.tags_to_add(), update_labels(
rebuild_tags.tags_to_remove()); &issue,
rebuild_tags.tags_to_add(),
overall_status.set_with_description( rebuild_tags.tags_to_remove(),
"^.^!",
hubcaps::statuses::State::Success
); );
overall_status.set_with_description("^.^!", hubcaps::statuses::State::Success);
} else { } else {
overall_status.set_with_description( overall_status.set_with_description(
"Complete, with errors", "Complete, with errors",
hubcaps::statuses::State::Failure hubcaps::statuses::State::Failure,
); );
} }
@ -401,13 +417,13 @@ impl<E: stats::SysEvents> worker::SimpleWorker for MassRebuildWorker<E> {
enum StdenvFrom { enum StdenvFrom {
Before, Before,
After After,
} }
#[derive(Debug)] #[derive(Debug)]
pub enum System { pub enum System {
X8664Darwin, X8664Darwin,
X8664Linux X8664Linux,
} }
#[derive(Debug, PartialEq)] #[derive(Debug, PartialEq)]
@ -433,7 +449,7 @@ impl Stdenvs {
darwin_stdenv_before: None, darwin_stdenv_before: None,
darwin_stdenv_after: None, darwin_stdenv_after: None,
} };
} }
fn identify_before(&mut self) { fn identify_before(&mut self) {
@ -462,7 +478,7 @@ impl Stdenvs {
} }
return changed return changed;
} }
fn identify(&mut self, system: System, from: StdenvFrom) { fn identify(&mut self, system: System, from: StdenvFrom) {
@ -485,50 +501,62 @@ impl Stdenvs {
fn evalstdenv(&self, system: &str) -> Option<String> { fn evalstdenv(&self, system: &str) -> Option<String> {
let result = self.nix.with_system(system.to_owned()).safely( let result = self.nix.with_system(system.to_owned()).safely(
"nix-instantiate", &self.co, vec![ "nix-instantiate",
&self.co,
vec![
String::from("."), String::from("."),
String::from("-A"), String::from("-A"),
String::from("stdenv"), String::from("stdenv"),
], ],
true true,
); );
println!("{:?}", result); println!("{:?}", result);
return match result { return match result {
Ok(mut out) => { Ok(mut out) => file_to_drv(&mut out),
file_to_drv(&mut out)
}
Err(mut out) => { Err(mut out) => {
println!("{:?}", file_to_str(&mut out)); println!("{:?}", file_to_str(&mut out));
None None
} }
} };
} }
} }
fn make_gist<'a>(gists: &hubcaps::gists::Gists<'a>, name: String, description: Option<String>, contents: String) -> Option<String> { fn make_gist<'a>(
gists: &hubcaps::gists::Gists<'a>,
name: String,
description: Option<String>,
contents: String,
) -> Option<String> {
let mut files = HashMap::new(); let mut files = HashMap::new();
files.insert(name.clone(), files.insert(
hubcaps::gists::Content { name.clone(),
filename: Some(name.clone()), hubcaps::gists::Content {
content: contents, filename: Some(name.clone()),
} content: contents,
},
); );
return Some(gists.create( return Some(
&hubcaps::gists::GistOptions { gists
description: description, .create(&hubcaps::gists::GistOptions {
public: Some(true), description: description,
files: files, public: Some(true),
} files: files,
).expect("Failed to create gist!").html_url); })
.expect("Failed to create gist!")
.html_url,
);
} }
pub fn update_labels(issue: &hubcaps::issues::IssueRef, add: Vec<String>, remove: Vec<String>) { pub fn update_labels(issue: &hubcaps::issues::IssueRef, add: Vec<String>, remove: Vec<String>) {
let l = issue.labels(); let l = issue.labels();
let existing: Vec<String> = issue.get().unwrap().labels let existing: Vec<String> = issue
.get()
.unwrap()
.labels
.iter() .iter()
.map(|l| l.name.clone()) .map(|l| l.name.clone())
.collect(); .collect();
@ -556,30 +584,31 @@ pub fn update_labels(issue: &hubcaps::issues::IssueRef, add: Vec<String>, remove
fn file_to_drv(f: &mut File) -> Option<String> { fn file_to_drv(f: &mut File) -> Option<String> {
let r = BufReader::new(f); let r = BufReader::new(f);
let matches: Vec<String>; let matches: Vec<String>;
matches = r.lines().filter_map(|x| matches = r.lines()
match x { .filter_map(|x| match x {
Ok(line) => { Ok(line) => {
if !line.starts_with("/nix/store/") { if !line.starts_with("/nix/store/") {
debug!("Skipping line, not /nix/store: {}", line); debug!("Skipping line, not /nix/store: {}", line);
return None return None;
} }
if !line.ends_with(".drv") { if !line.ends_with(".drv") {
debug!("Skipping line, not .drv: {}", line); debug!("Skipping line, not .drv: {}", line);
return None return None;
} }
return Some(line) return Some(line);
} }
Err(_) => None Err(_) => None,
}).collect(); })
.collect();
if matches.len() == 1 { if matches.len() == 1 {
return Some(matches.first().unwrap().clone()); return Some(matches.first().unwrap().clone());
} else { } else {
info!("Got wrong number of matches: {}", matches.len()); info!("Got wrong number of matches: {}", matches.len());
info!("Matches: {:?}", matches); info!("Matches: {:?}", matches);
return None return None;
} }
} }
@ -597,7 +626,11 @@ mod tests {
#[test] #[test]
fn stdenv_checking() { fn stdenv_checking() {
let nix = Nix::new(String::from("x86_64-linux"), String::from("daemon"), 1200); let nix = Nix::new(String::from("x86_64-linux"), String::from("daemon"), 1200);
let mut stdenv = Stdenvs::new(nix.clone(), PathBuf::from("/nix/var/nix/profiles/per-user/root/channels/nixos/nixpkgs")); let mut stdenv =
Stdenvs::new(
nix.clone(),
PathBuf::from("/nix/var/nix/profiles/per-user/root/channels/nixos/nixpkgs"),
);
stdenv.identify(System::X8664Linux, StdenvFrom::Before); stdenv.identify(System::X8664Linux, StdenvFrom::Before);
stdenv.identify(System::X8664Darwin, StdenvFrom::Before); stdenv.identify(System::X8664Darwin, StdenvFrom::Before);

View file

@ -1,21 +1,20 @@
use amqp::Basic; use amqp::Basic;
use amqp::{Consumer, Channel}; use amqp::{Consumer, Channel};
use amqp::protocol::basic::{Deliver,BasicProperties}; use amqp::protocol::basic::{Deliver, BasicProperties};
use std::marker::Send; use std::marker::Send;
use serde::Serialize; use serde::Serialize;
use serde_json; use serde_json;
use std::cmp::PartialEq; use std::cmp::PartialEq;
pub struct Worker<T: SimpleWorker> { pub struct Worker<T: SimpleWorker> {
internal: T internal: T,
} }
pub struct Response { pub struct Response {}
}
pub type Actions = Vec<Action>; pub type Actions = Vec<Action>;
#[derive(Debug,PartialEq)] #[derive(Debug, PartialEq)]
pub enum Action { pub enum Action {
Ack, Ack,
NackRequeue, NackRequeue,
@ -23,7 +22,7 @@ pub enum Action {
Publish(QueueMsg), Publish(QueueMsg),
} }
#[derive(Debug,PartialEq)] #[derive(Debug, PartialEq)]
pub struct QueueMsg { pub struct QueueMsg {
pub exchange: Option<String>, pub exchange: Option<String>,
pub routing_key: Option<String>, pub routing_key: Option<String>,
@ -33,21 +32,26 @@ pub struct QueueMsg {
pub content: Vec<u8>, pub content: Vec<u8>,
} }
pub fn publish_serde_action<T: ?Sized>(exchange: Option<String>, routing_key: Option<String>, msg: &T) -> Action pub fn publish_serde_action<T: ?Sized>(
where exchange: Option<String>,
T: Serialize { routing_key: Option<String>,
msg: &T,
) -> Action
where
T: Serialize,
{
let props = BasicProperties { let props = BasicProperties {
content_type: Some("application/json".to_owned()), content_type: Some("application/json".to_owned()),
..Default::default() ..Default::default()
}; };
return Action::Publish(QueueMsg{ return Action::Publish(QueueMsg {
exchange: exchange, exchange: exchange,
routing_key: routing_key, routing_key: routing_key,
mandatory: true, mandatory: true,
immediate: false, immediate: false,
properties: Some(props), properties: Some(props),
content: serde_json::to_string(&msg).unwrap().into_bytes() content: serde_json::to_string(&msg).unwrap().into_bytes(),
}); });
} }
@ -56,24 +60,28 @@ pub trait SimpleWorker {
fn consumer(&mut self, job: &Self::J) -> Actions; fn consumer(&mut self, job: &Self::J) -> Actions;
fn msg_to_job(&mut self, method: &Deliver, headers: &BasicProperties, fn msg_to_job(
body: &Vec<u8>) -> Result<Self::J, String>; &mut self,
method: &Deliver,
headers: &BasicProperties,
body: &Vec<u8>,
) -> Result<Self::J, String>;
} }
pub fn new<T: SimpleWorker>(worker: T) -> Worker<T> { pub fn new<T: SimpleWorker>(worker: T) -> Worker<T> {
return Worker{ return Worker { internal: worker };
internal: worker,
};
} }
impl <T: SimpleWorker + Send> Consumer for Worker<T> { impl<T: SimpleWorker + Send> Consumer for Worker<T> {
fn handle_delivery(&mut self, fn handle_delivery(
channel: &mut Channel, &mut self,
method: Deliver, channel: &mut Channel,
headers: BasicProperties, method: Deliver,
body: Vec<u8>) { headers: BasicProperties,
body: Vec<u8>,
) {
@ -84,24 +92,25 @@ impl <T: SimpleWorker + Send> Consumer for Worker<T> {
channel.basic_ack(method.delivery_tag, false).unwrap(); channel.basic_ack(method.delivery_tag, false).unwrap();
} }
Action::NackRequeue => { Action::NackRequeue => {
channel.basic_nack(method.delivery_tag, false, true).unwrap(); channel
.basic_nack(method.delivery_tag, false, true)
.unwrap();
} }
Action::NackDump => { Action::NackDump => {
channel.basic_nack(method.delivery_tag, false, false).unwrap(); channel
.basic_nack(method.delivery_tag, false, false)
.unwrap();
} }
Action::Publish(msg) => { Action::Publish(msg) => {
let exch = msg.exchange.clone().unwrap_or("".to_owned()); let exch = msg.exchange.clone().unwrap_or("".to_owned());
let key = msg.routing_key.clone().unwrap_or("".to_owned()); let key = msg.routing_key.clone().unwrap_or("".to_owned());
let props = msg.properties.unwrap_or(BasicProperties{ ..Default::default()}); let props = msg.properties.unwrap_or(
channel.basic_publish( BasicProperties { ..Default::default() },
exch, );
key, channel
msg.mandatory, .basic_publish(exch, key, msg.mandatory, msg.immediate, props, msg.content)
msg.immediate, .unwrap();
props,
msg.content
).unwrap();
} }
} }
} }

View file

@ -27,6 +27,7 @@ let
nix-prefetch-git nix-prefetch-git
rust.rustc rust.rustc
rust.cargo rust.cargo
rustfmt
carnix carnix
openssl.dev openssl.dev
pkgconfig pkgconfig