diff --git a/ofborg/src/asynccmd.rs b/ofborg/src/asynccmd.rs index 815181f..33389d7 100644 --- a/ofborg/src/asynccmd.rs +++ b/ofborg/src/asynccmd.rs @@ -3,16 +3,25 @@ use std::thread; use std::collections::HashMap; use std::process::Stdio; use std::process::ExitStatus; -use std::sync::mpsc::channel; +use std::sync::mpsc::sync_channel; use std::process::Command; use std::io::Read; -use std::sync::mpsc::{Sender, Receiver}; +use std::sync::mpsc::{SyncSender, Receiver}; use std::io::BufReader; use std::io::BufRead; use std::io; use std::process::Child; use std::thread::JoinHandle; +// Specifically set to fall under 1/2 of the AMQP library's +// SyncSender limitation. +const OUT_CHANNEL_BUFFER_SIZE: usize = 30; + +// The waiter channel should never be over 3 items: process, stderr, +// stdout, and thusly probably could be unbounded just fine, but what +// the heck. +const WAITER_CHANNEL_BUFFER_SIZE: usize = 10; + pub struct AsyncCmd { command: Command, } @@ -35,7 +44,7 @@ enum WaitResult { Process(Result), } -fn reader_tx(read: R, tx: Sender) -> thread::JoinHandle<()> { +fn reader_tx(read: R, tx: SyncSender) -> thread::JoinHandle<()> { let read = BufReader::new(read); thread::spawn(move || { @@ -56,7 +65,7 @@ fn reader_tx(read: R, tx: Sender) -> thread::J } -fn spawn_join(id: WaitTarget, tx: Sender<(WaitTarget, WaitResult)>, waiting_on: thread::JoinHandle) -> thread::JoinHandle<()> { +fn spawn_join(id: WaitTarget, tx: SyncSender<(WaitTarget, WaitResult)>, waiting_on: thread::JoinHandle) -> thread::JoinHandle<()> { thread::spawn(move || { if let Err(e) = tx.send((id, WaitResult::Thread(waiting_on.join()))) { error!("Failed to send message to the thread waiter: {:?}", e); @@ -64,7 +73,7 @@ fn spawn_join(id: WaitTarget, tx: Sender<(WaitTarget, WaitRes }) } -fn child_wait(id: WaitTarget, tx: Sender<(WaitTarget, WaitResult)>, mut waiting_on: Child) -> thread::JoinHandle<()> { +fn child_wait(id: WaitTarget, tx: SyncSender<(WaitTarget, WaitResult)>, mut waiting_on: Child) -> thread::JoinHandle<()> { thread::spawn(move || { if let Err(e) = tx.send((id, WaitResult::Process(waiting_on.wait()))) { error!("Failed to send message to the thread waiter: {:?}", e); @@ -85,8 +94,8 @@ impl AsyncCmd { .spawn() .unwrap(); - let (monitor_tx, monitor_rx) = channel(); - let (proc_tx, proc_rx) = channel(); + let (monitor_tx, monitor_rx) = sync_channel(WAITER_CHANNEL_BUFFER_SIZE); + let (proc_tx, proc_rx) = sync_channel(OUT_CHANNEL_BUFFER_SIZE); let mut waiters: HashMap> = HashMap::with_capacity(3); waiters.insert(