clean up whitespace

This commit is contained in:
Graham Christensen 2018-01-27 14:41:39 -05:00
parent b2e34d91d9
commit 7511eb771b
No known key found for this signature in database
GPG key ID: ACA1C1D120C83D5C
3 changed files with 77 additions and 55 deletions

View file

@ -27,7 +27,7 @@ pub struct AsyncCmd {
}
pub struct SpawnedAsyncCmd {
waiter: JoinHandle<(Option<Result<ExitStatus,io::Error>>)>,
waiter: JoinHandle<(Option<Result<ExitStatus, io::Error>>)>,
rx: Receiver<String>,
}
@ -41,43 +41,57 @@ enum WaitTarget {
#[derive(Debug)]
enum WaitResult<T> {
Thread(thread::Result<T>),
Process(Result<ExitStatus,io::Error>),
Process(Result<ExitStatus, io::Error>),
}
fn reader_tx<R: 'static + Read + Send>(read: R, tx: SyncSender<String>) -> thread::JoinHandle<()> {
let read = BufReader::new(read);
thread::spawn(move || {
for line in read.lines() {
let to_send: String = match line {
Ok(line) => line,
Err(e) => {
error!("Error reading data in reader_tx: {:?}", e);
"Non-UTF8 data omitted from the log.".to_owned()
}
};
if let Err(e) = tx.send(to_send) {
error!("Failed to send log line: {:?}", e);
thread::spawn(move || for line in read.lines() {
let to_send: String = match line {
Ok(line) => line,
Err(e) => {
error!("Error reading data in reader_tx: {:?}", e);
"Non-UTF8 data omitted from the log.".to_owned()
}
};
if let Err(e) = tx.send(to_send) {
error!("Failed to send log line: {:?}", e);
}
})
}
fn spawn_join<T: Send + 'static>(id: WaitTarget, tx: SyncSender<(WaitTarget, WaitResult<T>)>, waiting_on: thread::JoinHandle<T>) -> thread::JoinHandle<()> {
thread::spawn(move || {
if let Err(e) = tx.send((id, WaitResult::Thread(waiting_on.join()))) {
error!("Failed to send message to the thread waiter: {:?}", e);
}
fn spawn_join<T: Send + 'static>(
id: WaitTarget,
tx: SyncSender<(WaitTarget, WaitResult<T>)>,
waiting_on: thread::JoinHandle<T>,
) -> thread::JoinHandle<()> {
thread::spawn(move || if let Err(e) = tx.send((
id,
WaitResult::Thread(
waiting_on.join(),
),
))
{
error!("Failed to send message to the thread waiter: {:?}", e);
})
}
fn child_wait<T: Send + 'static>(id: WaitTarget, tx: SyncSender<(WaitTarget, WaitResult<T>)>, mut waiting_on: Child) -> thread::JoinHandle<()> {
thread::spawn(move || {
if let Err(e) = tx.send((id, WaitResult::Process(waiting_on.wait()))) {
error!("Failed to send message to the thread waiter: {:?}", e);
}
fn child_wait<T: Send + 'static>(
id: WaitTarget,
tx: SyncSender<(WaitTarget, WaitResult<T>)>,
mut waiting_on: Child,
) -> thread::JoinHandle<()> {
thread::spawn(move || if let Err(e) = tx.send((
id,
WaitResult::Process(
waiting_on.wait(),
),
))
{
error!("Failed to send message to the thread waiter: {:?}", e);
})
}
@ -103,8 +117,8 @@ impl AsyncCmd {
spawn_join(
WaitTarget::Stderr,
monitor_tx.clone(),
reader_tx(child.stderr.take().unwrap(), proc_tx.clone())
)
reader_tx(child.stderr.take().unwrap(), proc_tx.clone()),
),
);
waiters.insert(
@ -112,21 +126,17 @@ impl AsyncCmd {
spawn_join(
WaitTarget::Stdout,
monitor_tx.clone(),
reader_tx(child.stdout.take().unwrap(), proc_tx.clone())
)
reader_tx(child.stdout.take().unwrap(), proc_tx.clone()),
),
);
waiters.insert(
WaitTarget::Child,
child_wait(
WaitTarget::Child,
monitor_tx.clone(),
child
)
child_wait(WaitTarget::Child, monitor_tx.clone(), child),
);
let head_waiter = thread::spawn(move || {
let mut return_status: Option<Result<ExitStatus,io::Error>> = None;
let mut return_status: Option<Result<ExitStatus, io::Error>> = None;
for (id, interior_result) in monitor_rx.iter() {
match waiters.remove(&id) {
@ -147,7 +157,12 @@ impl AsyncCmd {
}
}
None => { error!("Received notice that {:?} finished, but it isn't being waited on?", id); }
None => {
error!(
"Received notice that {:?} finished, but it isn't being waited on?",
id
);
}
}
if waiters.len() == 0 {
@ -156,7 +171,10 @@ impl AsyncCmd {
}
}
info!("Out of the child waiter recv, with {:?} remaining waits", waiters.len());
info!(
"Out of the child waiter recv, with {:?} remaining waits",
waiters.len()
);
return return_status;
});
@ -185,12 +203,12 @@ impl SpawnedAsyncCmd {
mod tests {
use super::AsyncCmd;
use std::process::Command;
use std::ffi::{OsStr,OsString};
use std::ffi::{OsStr, OsString};
use std::os::unix::ffi::OsStrExt;
#[test]
fn basic_echo_test() {
let mut cmd = Command::new("/bin/sh");
let mut cmd = Command::new("/bin/sh");
cmd.arg("-c");
cmd.arg("echo hi");
let acmd = AsyncCmd::new(cmd);
@ -204,7 +222,7 @@ mod tests {
#[test]
fn basic_interpolation_test() {
let mut cmd = Command::new("stdbuf");
let mut cmd = Command::new("stdbuf");
cmd.arg("-o0");
cmd.arg("-e0");
cmd.arg("bash");
@ -224,9 +242,11 @@ mod tests {
#[test]
fn lots_of_small_ios_test() {
let mut cmd = Command::new("/bin/sh");
let mut cmd = Command::new("/bin/sh");
cmd.arg("-c");
cmd.arg("for i in `seq 1 100`; do (seq 1 100)& (seq 1 100 >&2)& wait; wait; done");
cmd.arg(
"for i in `seq 1 100`; do (seq 1 100)& (seq 1 100 >&2)& wait; wait; done",
);
let acmd = AsyncCmd::new(cmd);
let mut spawned = acmd.spawn();
@ -234,7 +254,9 @@ mod tests {
assert_eq!(lines.len(), 20000);
let thread_result = spawned.wait();
let child_result_opt = thread_result.expect("Thread should exit correctly");
let child_result = child_result_opt.expect("Thread should have properly properly returned the child's status");
let child_result = child_result_opt.expect(
"Thread should have properly properly returned the child's status",
);
let exit_status = child_result.expect("The child should have no problem exiting");
assert_eq!(true, exit_status.success());
}
@ -242,7 +264,7 @@ mod tests {
#[test]
fn lots_of_io_test() {
let mut cmd = Command::new("/bin/sh");
let mut cmd = Command::new("/bin/sh");
cmd.arg("-c");
cmd.arg("seq 1 100000; seq 1 100000 >&2");
let acmd = AsyncCmd::new(cmd);
@ -252,7 +274,9 @@ mod tests {
assert_eq!(lines.len(), 200000);
let thread_result = spawned.wait();
let child_result_opt = thread_result.expect("Thread should exit correctly");
let child_result = child_result_opt.expect("Thread should have properly properly returned the child's status");
let child_result = child_result_opt.expect(
"Thread should have properly properly returned the child's status",
);
let exit_status = child_result.expect("The child should have no problem exiting");
assert_eq!(true, exit_status.success());
}
@ -263,18 +287,17 @@ mod tests {
echos.push(OsStr::from_bytes(&[0xffu8]));
echos.push("; echo there;");
let mut cmd = Command::new("/bin/sh");
let mut cmd = Command::new("/bin/sh");
cmd.arg("-c");
cmd.arg(echos);
let acmd = AsyncCmd::new(cmd);
let mut spawned = acmd.spawn();
let lines: Vec<String> = spawned.lines().into_iter().collect();
assert_eq!(lines, vec![
"hi",
"Non-UTF8 data omitted from the log.",
"there",
]);
assert_eq!(
lines,
vec!["hi", "Non-UTF8 data omitted from the log.", "there"]
);
let ret = spawned.wait().unwrap().unwrap().unwrap().success();
assert_eq!(true, ret);
}

View file

@ -19,7 +19,7 @@ use ofborg::commentparser;
use ofborg::message::buildjob;
use ofborg::message::{Pr,Repo};
use ofborg::message::{Pr, Repo};
use ofborg::tasks;
@ -64,7 +64,7 @@ fn main() {
repo: repo_msg.clone(),
pr: pr_msg.clone(),
subset: Some(commentparser::Subset::Nixpkgs),
attrs: vec![ "success".to_owned() ],
attrs: vec!["success".to_owned()],
logs: Some((Some("logs".to_owned()), Some(logbackrk.to_lowercase()))),
statusreport: Some((None, Some("scratch".to_owned()))),
};

View file

@ -1,9 +1,9 @@
use std::fs;
use std::path::{Path,PathBuf};
use std::path::{Path, PathBuf};
use std::process::Command;
pub struct TestScratch {
root: PathBuf
root: PathBuf,
}
impl TestScratch {
@ -34,7 +34,6 @@ impl TestScratch {
pub fn path(&self) -> PathBuf {
self.root.clone()
}
}
impl Drop for TestScratch {