From c3ffe349e3f5cb0a7647222dfacb90202ce5296b Mon Sep 17 00:00:00 2001 From: Graham Christensen Date: Thu, 1 Feb 2018 10:28:38 -0500 Subject: [PATCH] Make writetoline not be so dumb ordered took: Duration { secs: 0, nanos: 15943489 } reversed took: Duration { secs: 5, nanos: 490028410 } old ordered took: Duration { secs: 11, nanos: 176841460 } old reversed took: Duration { secs: 19, nanos: 555249871 } --- ofborg/src/tasks/log_message_collector.rs | 17 +- ofborg/src/writetoline.rs | 299 ++++++++++++++++++---- 2 files changed, 259 insertions(+), 57 deletions(-) diff --git a/ofborg/src/tasks/log_message_collector.rs b/ofborg/src/tasks/log_message_collector.rs index 13b4715..091dec4 100644 --- a/ofborg/src/tasks/log_message_collector.rs +++ b/ofborg/src/tasks/log_message_collector.rs @@ -7,7 +7,7 @@ use std::fs; use std::fs::{OpenOptions, File}; use std::path::{Component, PathBuf}; -use ofborg::writetoline; +use ofborg::writetoline::LineWriter; use ofborg::message::buildlogmsg::BuildLogMsg; use ofborg::worker; use amqp::protocol::basic::{Deliver, BasicProperties}; @@ -19,7 +19,7 @@ pub struct LogFrom { } pub struct LogMessageCollector { - handles: LruCache, + handles: LruCache, log_root: PathBuf, } @@ -58,15 +58,16 @@ impl LogMessageCollector { }; } - pub fn handle_for(&mut self, from: &LogFrom) -> Result<&mut File, String> { + pub fn handle_for(&mut self, from: &LogFrom) -> Result<&mut LineWriter, String> { if self.handles.contains_key(&from) { return Ok(self.handles.get_mut(&from).expect( "handles just contained the key", )); } else { let logpath = self.path_for(&from)?; - let handle = self.open_log(logpath)?; - self.handles.insert(from.clone(), handle); + let fp = self.open_log(logpath)?; + let writer = LineWriter::new(fp); + self.handles.insert(from.clone(), writer); if let Some(handle) = self.handles.get_mut(&from) { return Ok(handle); } else { @@ -150,11 +151,7 @@ impl worker::SimpleWorker for LogMessageCollector { fn consumer(&mut self, job: &LogMessage) -> worker::Actions { let mut handle = self.handle_for(&job.from).unwrap(); - writetoline::write_to_line( - &mut handle, - (job.message.line_number - 1) as usize, - &job.message.output, - ); + handle.write_to_line((job.message.line_number - 1) as usize, &job.message.output); return vec![worker::Action::Ack]; } diff --git a/ofborg/src/writetoline.rs b/ofborg/src/writetoline.rs index 1bc06ed..21670bb 100644 --- a/ofborg/src/writetoline.rs +++ b/ofborg/src/writetoline.rs @@ -5,35 +5,87 @@ use std::io::Seek; use std::io::SeekFrom; use std::fs::File; -pub fn write_to_line(rw: &mut File, line: usize, data: &str) { +pub struct LineWriter { + file: File, + buffer: Vec, + last_line: usize, +} - rw.seek(SeekFrom::Start(0)).unwrap(); +impl LineWriter { + pub fn new(mut rw: File) -> LineWriter { + let buf = LineWriter::load_buffer(&mut rw); + let len = buf.len(); - let reader = BufReader::new(rw.try_clone().unwrap()); - let mut lines: Vec = reader - .lines() - .map(|line| match line { - Ok(s) => s, - Err(e) => format!("UTF-8 Decode err: {:?}", e), - }) - .collect(); - while lines.len() <= line { - lines.push("".to_owned()); + let writer = LineWriter { + file: rw, + buffer: buf, + last_line: len, + }; + + return writer; } - lines.remove(line); - lines.insert(line, data.to_owned()); + fn load_buffer(file: &mut File) -> Vec { + file.seek(SeekFrom::Start(0)).unwrap(); - let writeout = lines.join("\n"); + let reader = BufReader::new(file.try_clone().unwrap()); + reader + .lines() + .map(|line| match line { + Ok(s) => s, + Err(e) => format!("UTF-8 Decode err: {:?}", e), + }) + .collect() + } - rw.set_len(0).unwrap(); - rw.seek(SeekFrom::Start(0)).unwrap(); + pub fn write_to_line(&mut self, line: usize, data: &str) { + let buffer_len = self.buffer.len(); - let bytes = writeout.as_bytes(); - rw.write_all(bytes).unwrap(); - rw.write("\n".as_bytes()).unwrap(); + let original_len = self.buffer.len(); + while self.buffer.len() <= line { + self.buffer.push("".to_owned()); + } + + self.buffer.remove(line); + self.buffer.insert(line, data.to_owned()); + + if self.last_line > line { + // println!("taking the rewrite option"); + // We're inserting in to the middle of a file, so just + // write the entire buffer again + self.file.set_len(0).unwrap(); + self.file.seek(SeekFrom::Start(0)).unwrap(); + self.file + .write_all(self.buffer.join("\n").as_bytes()) + .unwrap(); + self.file.write("\n".as_bytes()); + } else { + // println!("taking the append option"); + // println!("Writing {:?} to line {}", data, line); + + let buffer_start = original_len; + let buffer_end = line + 1; + let to_write = self.buffer[buffer_start..buffer_end].join("\n"); + // println!("Full buffer: {:?}", self.buffer); + // println!("buffer[{}..{}] = {:?}", buffer_start, buffer_end, to_write); + // Inclusive range syntax (ie: ...) is experimental, so + // to include the final newline in to the written buffer + // we have to use one more than the range we want for the + // end + // println!("selected buffer: {:?}", to_write); + self.file.write(to_write.as_bytes()).unwrap(); + self.file.write("\n".as_bytes()); + } + + self.last_line = line; + } + + pub fn inner(mut self) -> File { + self.file + } } + #[cfg(test)] mod tests { use super::*; @@ -42,6 +94,7 @@ mod tests { use std::io::Read; use std::fs::OpenOptions; use ofborg::test_scratch::TestScratch; + use std::time::Instant; fn testfile(path: &Path) -> File { OpenOptions::new() @@ -69,11 +122,23 @@ mod tests { let mut f = testfile(&p.path()); assert_file_content(&mut f, ""); - write_to_line(&mut f, 0, "hello"); + + let mut writer = LineWriter::new(f); + writer.write_to_line(0, "hello"); + f = writer.inner(); + assert_file_content(&mut f, "hello\n"); - write_to_line(&mut f, 1, "world"); + + let mut writer = LineWriter::new(f); + writer.write_to_line(1, "world"); + f = writer.inner(); + assert_file_content(&mut f, "hello\nworld\n"); - write_to_line(&mut f, 2, ":)"); + + let mut writer = LineWriter::new(f); + writer.write_to_line(2, ":)"); + f = writer.inner(); + assert_file_content(&mut f, "hello\nworld\n:)\n"); } @@ -83,68 +148,173 @@ mod tests { let mut f = testfile(&p.path()); assert_file_content(&mut f, ""); - write_to_line(&mut f, 2, ":)"); + + { + let mut writer = LineWriter::new(f); + writer.write_to_line(2, ":)"); + f = writer.inner(); + } + assert_file_content(&mut f, "\n\n:)\n"); - write_to_line(&mut f, 1, "world"); + { + let mut writer = LineWriter::new(f); + writer.write_to_line(1, "world"); + f = writer.inner(); + } + assert_file_content(&mut f, "\nworld\n:)\n"); - write_to_line(&mut f, 0, "hello"); + { + let mut writer = LineWriter::new(f); + writer.write_to_line(0, "hello"); + f = writer.inner(); + } + assert_file_content(&mut f, "hello\nworld\n:)\n"); } - #[test] fn test_writer_line_unordered_long() { let p = TestScratch::new_file("writetoline-unordered-long"); let mut f = testfile(&p.path()); assert_file_content(&mut f, ""); - write_to_line( - &mut f, - 2, - "AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA", - ); + + { + let mut writer = LineWriter::new(f); + writer.write_to_line( + 2, + "AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA", + ); + f = writer.inner(); + } assert_file_content( &mut f, "\n\nAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA\n", ); - write_to_line( - &mut f, - 1, - "BBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBB", - ); + { + let mut writer = LineWriter::new(f); + writer.write_to_line( + 1, + "BBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBB", + ); + f = writer.inner(); + } assert_file_content( &mut f, "\nBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBB\nAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA\n", ); - write_to_line( - &mut f, - 0, - "CCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCC", - ); + { + let mut writer = LineWriter::new(f); + writer.write_to_line( + 0, + "CCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCC", + ); + f = writer.inner(); + } assert_file_content( &mut f, "CCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCC\nBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBB\nAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA\n", ); } - #[test] fn test_writer_line_unordered_longish() { let p = TestScratch::new_file("writetoline-unordered-longish"); let mut f = testfile(&p.path()); assert_file_content(&mut f, ""); - write_to_line(&mut f, 2, "hello"); + + { + let mut writer = LineWriter::new(f); + writer.write_to_line(2, "hello"); + f = writer.inner(); + } assert_file_content(&mut f, "\n\nhello\n"); - write_to_line(&mut f, 1, "mynameis"); + { + let mut writer = LineWriter::new(f); + writer.write_to_line(1, "mynameis"); + f = writer.inner(); + } assert_file_content(&mut f, "\nmynameis\nhello\n"); - write_to_line(&mut f, 0, "graham"); + { + let mut writer = LineWriter::new(f); + writer.write_to_line(0, "graham"); + f = writer.inner(); + } + assert_file_content(&mut f, "graham\nmynameis\nhello\n"); + } + + #[test] + fn test_writer_line_ordered_result() { + let p = TestScratch::new_file("writetoline-ordered-result"); + let mut f = testfile(&p.path()); + + let mut writer = LineWriter::new(f); + writer.write_to_line(0, "hello"); + writer.write_to_line(1, "world"); + writer.write_to_line(2, ":)"); + f = writer.inner(); + + assert_file_content(&mut f, "hello\nworld\n:)\n"); + } + + #[test] + fn test_writer_line_unordered_result() { + let p = TestScratch::new_file("writetoline-unordered-result"); + let mut f = testfile(&p.path()); + + let mut writer = LineWriter::new(f); + writer.write_to_line(2, ":)"); + writer.write_to_line(1, "world"); + writer.write_to_line(0, "hello"); + f = writer.inner(); + + assert_file_content(&mut f, "hello\nworld\n:)\n"); + } + + #[test] + fn test_writer_line_unordered_long_result() { + let p = TestScratch::new_file("writetoline-unordered-long-result"); + let mut f = testfile(&p.path()); + + let mut writer = LineWriter::new(f); + writer.write_to_line( + 2, + "AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA", + ); + writer.write_to_line( + 1, + "BBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBB", + ); + writer.write_to_line( + 0, + "CCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCC", + ); + f = writer.inner(); + + assert_file_content( + &mut f, + "CCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCC\nBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBB\nAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA\n", + ); + } + + #[test] + fn test_writer_line_unordered_longish_result() { + let p = TestScratch::new_file("writetoline-unordered-longish-result"); + let mut f = testfile(&p.path()); + + let mut writer = LineWriter::new(f); + writer.write_to_line(2, "hello"); + writer.write_to_line(1, "mynameis"); + writer.write_to_line(0, "graham"); + f = writer.inner(); + assert_file_content(&mut f, "graham\nmynameis\nhello\n"); } @@ -154,7 +324,42 @@ mod tests { let mut f = testfile(&p.path()); assert_file_content(&mut f, ""); - write_to_line(&mut f, 5, "hello"); + + { + let mut writer = LineWriter::new(f); + writer.write_to_line(5, "hello"); + f = writer.inner(); + } assert_file_content(&mut f, "\n\n\n\n\nhello\n"); } + + #[test] + fn bench_lots_of_ordered_lines() { + let p = TestScratch::new_file("bench-ordered-lines"); + let mut f = testfile(&p.path()); + let mut writer = LineWriter::new(f); + + let timer = Instant::now(); + + for i in 0..3000 { + writer.write_to_line(i, "This is my line!"); + } + + println!("ordered took: {:?}", timer.elapsed()); + } + + #[test] + fn bench_lots_of_reversed_lines() { + let p = TestScratch::new_file("bench-reversed-lines"); + let mut f = testfile(&p.path()); + let mut writer = LineWriter::new(f); + + let timer = Instant::now(); + + for i in (0..3000).rev() { + writer.write_to_line(i, "This is my line!"); + } + + println!("reversed took: {:?}", timer.elapsed()); + } }