Send log messages

This commit is contained in:
Graham Christensen 2018-01-19 21:59:54 -05:00
parent 67869640ec
commit 47f3ba4bef
No known key found for this signature in database
GPG key ID: ACA1C1D120C83D5C

View file

@ -1,15 +1,17 @@
extern crate amqp; extern crate amqp;
extern crate env_logger; extern crate env_logger;
use std::collections::LinkedList; use std::collections::VecDeque;
use std::fs::File; use std::fs::File;
use std::io::BufReader; use std::io::BufReader;
use std::io::BufRead; use std::io::BufRead;
use ofborg::asynccmd::AsyncCmd;
use ofborg::checkout; use ofborg::checkout;
use ofborg::message::buildjob; use ofborg::message::buildjob;
use ofborg::message::buildresult; use ofborg::message::buildresult;
use ofborg::message::buildlogmsg;
use ofborg::nix; use ofborg::nix;
use ofborg::commentparser; use ofborg::commentparser;
@ -35,38 +37,42 @@ impl BuildWorker {
}; };
} }
fn actions<'a>(&self, receiver: &'a mut notifyworker::NotificationReceiver) -> JobActions<'a> { fn actions<'a, 'b>(&self, job: &'b buildjob::BuildJob, receiver: &'a mut notifyworker::NotificationReceiver) -> JobActions<'a, 'b> {
JobActions::new(&self.system, &self.identity, receiver) JobActions::new(&self.system, &self.identity, job, receiver)
} }
} }
struct JobActions<'a> { struct JobActions<'a, 'b> {
system: String, system: String,
identity: String, identity: String,
receiver: &'a mut notifyworker::NotificationReceiver receiver: &'a mut notifyworker::NotificationReceiver,
job: &'b buildjob::BuildJob,
line_counter: u64,
} }
impl<'a> JobActions<'a> { impl<'a, 'b> JobActions<'a, 'b> {
fn new(system: &str, identity: &str, receiver: &'a mut notifyworker::NotificationReceiver) -> JobActions<'a> { fn new(system: &str, identity: &str, job: &'b buildjob::BuildJob, receiver: &'a mut notifyworker::NotificationReceiver) -> JobActions<'a, 'b> {
return JobActions { return JobActions {
system: system.to_owned(), system: system.to_owned(),
identity: system.to_owned(), identity: system.to_owned(),
receiver: receiver, receiver: receiver,
job: job,
line_counter: 0,
}; };
} }
pub fn commit_missing(&mut self, _job: &buildjob::BuildJob) { pub fn commit_missing(&mut self) {
self.tell(worker::Action::Ack); self.tell(worker::Action::Ack);
} }
pub fn nasty_hack_linux_only(&mut self, _job: &buildjob::BuildJob) { pub fn nasty_hack_linux_only(&mut self) {
self.tell(worker::Action::Ack); self.tell(worker::Action::Ack);
} }
pub fn merge_failed(&mut self, job: &buildjob::BuildJob) { pub fn merge_failed(&mut self) {
let msg = buildresult::BuildResult { let msg = buildresult::BuildResult {
repo: job.repo.clone(), repo: self.job.repo.clone(),
pr: job.pr.clone(), pr: self.job.pr.clone(),
system: self.system.clone(), system: self.system.clone(),
output: vec![String::from("Merge failed")], output: vec![String::from("Merge failed")],
@ -81,10 +87,28 @@ impl<'a> JobActions<'a> {
self.tell(worker::Action::Ack); self.tell(worker::Action::Ack);
} }
pub fn build_finished(&mut self, job: &buildjob::BuildJob, success: bool, lines: Vec<String>) { pub fn log_line(&mut self, line: &str) {
self.line_counter += 1;
let msg = buildlogmsg::BuildLogMsg {
identity: self.identity.clone(),
system: self.system.clone(),
line_number: self.line_counter,
output: line.to_owned(),
};
self.tell(worker::publish_serde_action(
Some("logs".to_owned()),
Some("build.log".to_owned()),
&msg
));
self.tell(worker::Action::Ack);
}
pub fn build_finished(&mut self, success: bool, lines: Vec<String>) {
let msg = buildresult::BuildResult { let msg = buildresult::BuildResult {
repo: job.repo.clone(), repo: self.job.repo.clone(),
pr: job.pr.clone(), pr: self.job.pr.clone(),
system: self.system.clone(), system: self.system.clone(),
output: lines, output: lines,
success: success success: success
@ -119,7 +143,7 @@ impl notifyworker::SimpleNotifyWorker for BuildWorker {
} }
fn consumer(&self, job: &buildjob::BuildJob, notifier: &mut notifyworker::NotificationReceiver) { fn consumer(&self, job: &buildjob::BuildJob, notifier: &mut notifyworker::NotificationReceiver) {
let mut actions = self.actions(notifier); let mut actions = self.actions(&job, notifier);
info!("Working on {}", job.pr.number); info!("Working on {}", job.pr.number);
let project = self.cloner.project(job.repo.full_name.clone(), job.repo.clone_url.clone()); let project = self.cloner.project(job.repo.full_name.clone(), job.repo.clone_url.clone());
@ -141,7 +165,7 @@ impl notifyworker::SimpleNotifyWorker for BuildWorker {
if buildfile == "./nixos/release.nix" && self.system != "x86_64-linux" { if buildfile == "./nixos/release.nix" && self.system != "x86_64-linux" {
// NixOS jobs get routed to all builders, even though darwin // NixOS jobs get routed to all builders, even though darwin
// cannot build them. // cannot build them.
actions.nasty_hack_linux_only(&job); actions.nasty_hack_linux_only();
return; return;
} }
@ -150,13 +174,13 @@ impl notifyworker::SimpleNotifyWorker for BuildWorker {
if !co.commit_exists(job.pr.head_sha.as_ref()) { if !co.commit_exists(job.pr.head_sha.as_ref()) {
info!("Commit {} doesn't exist", job.pr.head_sha); info!("Commit {} doesn't exist", job.pr.head_sha);
actions.commit_missing(&job); actions.commit_missing();
return; return;
} }
if let Err(_) = co.merge_commit(job.pr.head_sha.as_ref()) { if let Err(_) = co.merge_commit(job.pr.head_sha.as_ref()) {
info!("Failed to merge {}", job.pr.head_sha); info!("Failed to merge {}", job.pr.head_sha);
actions.merge_failed(&job); actions.merge_failed();
return; return;
} }
@ -165,46 +189,36 @@ impl notifyworker::SimpleNotifyWorker for BuildWorker {
let success: bool; let success: bool;
let reader: BufReader<File>; let reader: BufReader<File>;
match self.nix.safely_build_attrs(refpath.as_ref(), let cmd = self.nix.safely_build_attrs_cmd(
buildfile, refpath.as_ref(),
job.attrs.clone()) { buildfile,
Ok(r) => { job.attrs.clone()
success = true;
reader = BufReader::new(r);
}
Err(r) => {
success = false;
reader = BufReader::new(r);
}
}
println!("ok built ({:?}), building", success);
let l10 = reader.lines().fold(LinkedList::new(),
|mut coll, line|
{
match line {
Ok(e) => { coll.push_back(e); }
Err(wtf) => {
println!("Got err in lines: {:?}", wtf);
coll.push_back(String::from("<line omitted due to error>"));
}
}
if coll.len() == 11 {
coll.pop_front();
}
return coll
}
); );
println!("Lines: {:?}", l10);
let last10lines = l10.into_iter().collect::<Vec<_>>(); let acmd = AsyncCmd::new(cmd);
let mut spawned = acmd.spawn();
let mut snippet_log = VecDeque::with_capacity(10);
for line in spawned.lines().iter() {
actions.log_line(&line);
if snippet_log.len() >= 10 {
snippet_log.pop_front();
}
snippet_log.push_back(line.to_owned());
}
let success = spawned.wait().success();
println!("ok built ({:?}), building", success);
println!("Lines: {:?}", snippet_log);
let last10lines: Vec<String> = snippet_log.into_iter().collect::<Vec<String>>();
actions.build_finished( actions.build_finished(
&job,
success, success,
last10lines.clone() last10lines.clone()
); );
@ -276,6 +290,9 @@ mod tests {
let mut dummyreceiver = notifyworker::DummyNotificationReceiver::new(); let mut dummyreceiver = notifyworker::DummyNotificationReceiver::new();
worker.consumer(&job, &mut dummyreceiver); worker.consumer(&job, &mut dummyreceiver);
panic!("{:?}", dummyreceiver.actions);
let actions = dummyreceiver.actions.iter();
println!("{:?}", actions);
} }
} }