Use bounded queues for the asynccmd task

This commit is contained in:
Graham Christensen 2018-01-26 10:06:43 -05:00
parent e71bcfa20b
commit 116c094877
No known key found for this signature in database
GPG key ID: ACA1C1D120C83D5C

View file

@ -3,16 +3,25 @@ use std::thread;
use std::collections::HashMap;
use std::process::Stdio;
use std::process::ExitStatus;
use std::sync::mpsc::channel;
use std::sync::mpsc::sync_channel;
use std::process::Command;
use std::io::Read;
use std::sync::mpsc::{Sender, Receiver};
use std::sync::mpsc::{SyncSender, Receiver};
use std::io::BufReader;
use std::io::BufRead;
use std::io;
use std::process::Child;
use std::thread::JoinHandle;
// Specifically set to fall under 1/2 of the AMQP library's
// SyncSender limitation.
const OUT_CHANNEL_BUFFER_SIZE: usize = 30;
// The waiter channel should never be over 3 items: process, stderr,
// stdout, and thusly probably could be unbounded just fine, but what
// the heck.
const WAITER_CHANNEL_BUFFER_SIZE: usize = 10;
pub struct AsyncCmd {
command: Command,
}
@ -35,7 +44,7 @@ enum WaitResult<T> {
Process(Result<ExitStatus,io::Error>),
}
fn reader_tx<R: 'static + Read + Send>(read: R, tx: Sender<String>) -> thread::JoinHandle<()> {
fn reader_tx<R: 'static + Read + Send>(read: R, tx: SyncSender<String>) -> thread::JoinHandle<()> {
let read = BufReader::new(read);
thread::spawn(move || {
@ -56,7 +65,7 @@ fn reader_tx<R: 'static + Read + Send>(read: R, tx: Sender<String>) -> thread::J
}
fn spawn_join<T: Send + 'static>(id: WaitTarget, tx: Sender<(WaitTarget, WaitResult<T>)>, waiting_on: thread::JoinHandle<T>) -> thread::JoinHandle<()> {
fn spawn_join<T: Send + 'static>(id: WaitTarget, tx: SyncSender<(WaitTarget, WaitResult<T>)>, waiting_on: thread::JoinHandle<T>) -> thread::JoinHandle<()> {
thread::spawn(move || {
if let Err(e) = tx.send((id, WaitResult::Thread(waiting_on.join()))) {
error!("Failed to send message to the thread waiter: {:?}", e);
@ -64,7 +73,7 @@ fn spawn_join<T: Send + 'static>(id: WaitTarget, tx: Sender<(WaitTarget, WaitRes
})
}
fn child_wait<T: Send + 'static>(id: WaitTarget, tx: Sender<(WaitTarget, WaitResult<T>)>, mut waiting_on: Child) -> thread::JoinHandle<()> {
fn child_wait<T: Send + 'static>(id: WaitTarget, tx: SyncSender<(WaitTarget, WaitResult<T>)>, mut waiting_on: Child) -> thread::JoinHandle<()> {
thread::spawn(move || {
if let Err(e) = tx.send((id, WaitResult::Process(waiting_on.wait()))) {
error!("Failed to send message to the thread waiter: {:?}", e);
@ -85,8 +94,8 @@ impl AsyncCmd {
.spawn()
.unwrap();
let (monitor_tx, monitor_rx) = channel();
let (proc_tx, proc_rx) = channel();
let (monitor_tx, monitor_rx) = sync_channel(WAITER_CHANNEL_BUFFER_SIZE);
let (proc_tx, proc_rx) = sync_channel(OUT_CHANNEL_BUFFER_SIZE);
let mut waiters: HashMap<WaitTarget, thread::JoinHandle<()>> = HashMap::with_capacity(3);
waiters.insert(