diff --git a/nix/ofborg-carnix.nix b/nix/ofborg-carnix.nix index ae804a3..b22d24e 100644 --- a/nix/ofborg-carnix.nix +++ b/nix/ofborg-carnix.nix @@ -182,6 +182,13 @@ rec { sha256 = "1bxsh6fags7nr36vlz07ik2a1rzyipc8x1y30kjk832hf2pzadmw"; inherit dependencies buildDependencies features; }; + either_1_4_0_ = { dependencies?[], buildDependencies?[], features?[] }: buildRustCrate { + crateName = "either"; + version = "1.4.0"; + authors = [ "bluss" ]; + sha256 = "04kpfd84lvyrkb2z4sljlz2d3d5qczd0sb1yy37fgijq2yx3vb37"; + inherit dependencies buildDependencies features; + }; enum_primitive_0_1_1_ = { dependencies?[], buildDependencies?[], features?[] }: buildRustCrate { crateName = "enum_primitive"; version = "0.1.1"; @@ -974,6 +981,17 @@ rec { dtoa_0_4_2_features = f: updateFeatures f (rec { dtoa_0_4_2.default = (f.dtoa_0_4_2.default or true); }) []; + either_1_4_0 = { features?(either_1_4_0_features {}) }: either_1_4_0_ { + dependencies = mapFeatures features ([]); + features = mkFeatures (features.either_1_4_0 or {}); + }; + either_1_4_0_features = f: updateFeatures f (rec { + either_1_4_0.default = (f.either_1_4_0.default or true); + either_1_4_0.use_std = + (f.either_1_4_0.use_std or false) || + (f.either_1_4_0.default or false) || + (either_1_4_0.default or false); + }) []; enum_primitive_0_1_1 = { features?(enum_primitive_0_1_1_features {}) }: enum_primitive_0_1_1_ { dependencies = mapFeatures features ([ num_traits_0_1_41 ]); }; @@ -1325,10 +1343,11 @@ rec { num_cpus_1_8_0.default = (f.num_cpus_1_8_0.default or true); }) [ libc_0_2_36_features ]; ofborg_0_1_1 = { features?(ofborg_0_1_1_features {}) }: ofborg_0_1_1_ { - dependencies = mapFeatures features ([ amqp_0_1_0 env_logger_0_4_3 fs2_0_4_3 hubcaps_0_3_16 hyper_0_10_13 hyper_native_tls_0_2_4 log_0_3_8 lru_cache_0_1_1 md5_0_3_6 serde_1_0_27 serde_derive_1_0_27 serde_json_1_0_9 tempfile_2_2_0 uuid_0_4_0 ]); + dependencies = mapFeatures features ([ amqp_0_1_0 either_1_4_0 env_logger_0_4_3 fs2_0_4_3 hubcaps_0_3_16 hyper_0_10_13 hyper_native_tls_0_2_4 log_0_3_8 lru_cache_0_1_1 md5_0_3_6 serde_1_0_27 serde_derive_1_0_27 serde_json_1_0_9 tempfile_2_2_0 uuid_0_4_0 ]); }; ofborg_0_1_1_features = f: updateFeatures f (rec { amqp_0_1_0.default = true; + either_1_4_0.default = true; env_logger_0_4_3.default = true; fs2_0_4_3.default = true; hubcaps_0_3_16.default = true; @@ -1344,7 +1363,7 @@ rec { tempfile_2_2_0.default = true; uuid_0_4_0.default = true; uuid_0_4_0.v4 = true; - }) [ amqp_0_1_0_features env_logger_0_4_3_features fs2_0_4_3_features hubcaps_0_3_16_features hyper_0_10_13_features hyper_native_tls_0_2_4_features log_0_3_8_features lru_cache_0_1_1_features md5_0_3_6_features serde_1_0_27_features serde_derive_1_0_27_features serde_json_1_0_9_features tempfile_2_2_0_features uuid_0_4_0_features ]; + }) [ amqp_0_1_0_features either_1_4_0_features env_logger_0_4_3_features fs2_0_4_3_features hubcaps_0_3_16_features hyper_0_10_13_features hyper_native_tls_0_2_4_features log_0_3_8_features lru_cache_0_1_1_features md5_0_3_6_features serde_1_0_27_features serde_derive_1_0_27_features serde_json_1_0_9_features tempfile_2_2_0_features uuid_0_4_0_features ]; openssl_0_9_23 = { features?(openssl_0_9_23_features {}) }: openssl_0_9_23_ { dependencies = mapFeatures features ([ bitflags_0_9_1 foreign_types_0_3_2 lazy_static_1_0_0 libc_0_2_36 openssl_sys_0_9_24 ]); features = mkFeatures (features.openssl_0_9_23 or {}); diff --git a/ofborg/Cargo.lock b/ofborg/Cargo.lock index e766847..beaf34a 100644 --- a/ofborg/Cargo.lock +++ b/ofborg/Cargo.lock @@ -132,6 +132,11 @@ name = "dtoa" version = "0.4.2" source = "registry+https://github.com/rust-lang/crates.io-index" +[[package]] +name = "either" +version = "1.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" + [[package]] name = "enum_primitive" version = "0.1.1" @@ -377,6 +382,7 @@ name = "ofborg" version = "0.1.1" dependencies = [ "amqp 0.1.0 (git+https://github.com/grahamc/rust-amqp.git)", + "either 1.4.0 (registry+https://github.com/rust-lang/crates.io-index)", "env_logger 0.4.3 (registry+https://github.com/rust-lang/crates.io-index)", "fs2 0.4.3 (registry+https://github.com/rust-lang/crates.io-index)", "hubcaps 0.3.16 (git+https://github.com/grahamc/hubcaps.git)", @@ -761,6 +767,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" "checksum core-foundation 0.2.3 (registry+https://github.com/rust-lang/crates.io-index)" = "25bfd746d203017f7d5cbd31ee5d8e17f94b6521c7af77ece6c9e4b2d4b16c67" "checksum core-foundation-sys 0.2.3 (registry+https://github.com/rust-lang/crates.io-index)" = "065a5d7ffdcbc8fa145d6f0746f3555025b9097a9e9cda59f7467abae670c78d" "checksum dtoa 0.4.2 (registry+https://github.com/rust-lang/crates.io-index)" = "09c3753c3db574d215cba4ea76018483895d7bff25a31b49ba45db21c48e50ab" +"checksum either 1.4.0 (registry+https://github.com/rust-lang/crates.io-index)" = "740178ddf48b1a9e878e6d6509a1442a2d42fd2928aae8e7a6f8a36fb01981b3" "checksum enum_primitive 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)" = "be4551092f4d519593039259a9ed8daedf0da12e5109c5280338073eaeb81180" "checksum env_logger 0.3.5 (registry+https://github.com/rust-lang/crates.io-index)" = "15abd780e45b3ea4f76b4e9a26ff4843258dd8a3eed2775a0e7368c2e7936c2f" "checksum env_logger 0.4.3 (registry+https://github.com/rust-lang/crates.io-index)" = "3ddf21e73e016298f5cb37d6ef8e8da8e39f91f9ec8b0df44b7deb16a9f8cd5b" diff --git a/ofborg/Cargo.toml b/ofborg/Cargo.toml index 9607cc4..d7e3f78 100644 --- a/ofborg/Cargo.toml +++ b/ofborg/Cargo.toml @@ -7,6 +7,7 @@ build = "build.rs" [dependencies] +either = "1.4.0" log = "= 0.3.8" env_logger = "= 0.4.3" # amqp = { path = "./rust-amqp/" } # for testing patches diff --git a/ofborg/src/lib.rs b/ofborg/src/lib.rs index bda70db..509efa1 100644 --- a/ofborg/src/lib.rs +++ b/ofborg/src/lib.rs @@ -9,7 +9,7 @@ extern crate log; extern crate hubcaps; extern crate hyper; extern crate hyper_native_tls; - +extern crate either; extern crate lru_cache; extern crate tempfile; extern crate amqp; diff --git a/ofborg/src/message/buildlogmsg.rs b/ofborg/src/message/buildlogmsg.rs index e88379b..00f44dc 100644 --- a/ofborg/src/message/buildlogmsg.rs +++ b/ofborg/src/message/buildlogmsg.rs @@ -1,5 +1,5 @@ -#[derive(Serialize, Deserialize, Debug)] +#[derive(Serialize, Deserialize, Debug, Clone)] pub struct BuildLogMsg { pub system: String, pub identity: String, @@ -8,7 +8,7 @@ pub struct BuildLogMsg { pub output: String, } -#[derive(Serialize, Deserialize, Debug)] +#[derive(Serialize, Deserialize, Debug, Clone)] pub struct BuildLogStart { pub system: String, pub identity: String, diff --git a/ofborg/src/tasks/log_message_collector.rs b/ofborg/src/tasks/log_message_collector.rs index 64d1e81..623c36a 100644 --- a/ofborg/src/tasks/log_message_collector.rs +++ b/ofborg/src/tasks/log_message_collector.rs @@ -1,14 +1,16 @@ extern crate amqp; extern crate env_logger; +use either::{Either, Left, Right}; use lru_cache::LruCache; use serde_json; use std::fs; use std::fs::{OpenOptions, File}; use std::path::{Component, PathBuf}; +use std::io::Write; use ofborg::writetoline::LineWriter; -use ofborg::message::buildlogmsg::BuildLogMsg; +use ofborg::message::buildlogmsg::{BuildLogStart, BuildLogMsg}; use ofborg::worker; use amqp::protocol::basic::{Deliver, BasicProperties}; @@ -26,7 +28,7 @@ pub struct LogMessageCollector { #[derive(Debug)] pub struct LogMessage { from: LogFrom, - message: BuildLogMsg, + message: Either, } fn validate_path_segment(segment: &PathBuf) -> Result<(), String> { @@ -58,14 +60,32 @@ impl LogMessageCollector { }; } + pub fn write_metadata(&mut self, from: &LogFrom, data: &BuildLogStart) -> Result<(), String>{ + let metapath = self.path_for_metadata(&from)?; + let mut fp = self.open_file(metapath)?; + + match serde_json::to_string(data) { + Ok(data) => { + if let Err(e) = fp.write(&data.as_bytes()) { + Err(format!("Failed to write metadata: {:?}", e)) + } else { + Ok(()) + } + }, + Err(e) => { + Err(format!("Failed to stringify metadata: {:?}", e)) + } + } + } + pub fn handle_for(&mut self, from: &LogFrom) -> Result<&mut LineWriter, String> { if self.handles.contains_key(&from) { return Ok(self.handles.get_mut(&from).expect( "handles just contained the key", )); } else { - let logpath = self.path_for(&from)?; - let fp = self.open_log(logpath)?; + let logpath = self.path_for_log(&from)?; + let fp = self.open_file(logpath)?; let writer = LineWriter::new(fp); self.handles.insert(from.clone(), writer); if let Some(handle) = self.handles.get_mut(&from) { @@ -78,7 +98,13 @@ impl LogMessageCollector { } } - fn path_for(&self, from: &LogFrom) -> Result { + fn path_for_metadata(&self, from: &LogFrom) -> Result { + let mut path = self.path_for_log(from)?; + path.set_extension("metadata.json"); + return Ok(path); + } + + fn path_for_log(&self, from: &LogFrom) -> Result { let mut location = self.log_root.clone(); let routing_key = PathBuf::from(from.routing_key.clone()); @@ -100,7 +126,7 @@ impl LogMessageCollector { } } - fn open_log(&self, path: PathBuf) -> Result { + fn open_file(&self, path: PathBuf) -> Result { let dir = path.parent().unwrap(); fs::create_dir_all(dir).unwrap(); @@ -114,7 +140,7 @@ impl LogMessageCollector { match attempt { Ok(handle) => Ok(handle), Err(e) => Err(format!( - "Failed to open the log file for {:?}, err: {:?}", + "Failed to open the file for {:?}, err: {:?}", &path, e )), @@ -144,14 +170,22 @@ impl worker::SimpleWorker for LogMessageCollector { routing_key: deliver.routing_key.clone(), attempt_id: message.attempt_id.clone(), }, - message: message, + message: Right(message), }) } fn consumer(&mut self, job: &LogMessage) -> worker::Actions { - let handle = self.handle_for(&job.from).unwrap(); + match job.message { + Left(ref start) => { + self.write_metadata(&job.from, &start).expect("failed to write metadata"); + }, + Right(ref message) => { + let handle = self.handle_for(&job.from).unwrap(); - handle.write_to_line((job.message.line_number - 1) as usize, &job.message.output); + handle.write_to_line((message.line_number - 1) as usize, + &message.output); + } + } return vec![worker::Action::Ack]; } @@ -194,12 +228,29 @@ mod tests { } #[test] - fn test_path_for() { - let p = TestScratch::new_dir("log-message-collector-path_for"); + fn test_path_for_metadata() { + let p = TestScratch::new_dir("log-message-collector-path_for_metadata"); let worker = make_worker(p.path()); let path = worker - .path_for(&LogFrom { + .path_for_metadata(&LogFrom { + attempt_id: String::from("my-attempt-id"), + routing_key: String::from("my-routing-key"), + }) + .expect("the path should be valid"); + + + assert!(path.starts_with(p.path())); + assert!(path.as_os_str().to_string_lossy().ends_with("my-routing-key/my-attempt-id.metadata.json")); + } + + #[test] + fn test_path_for_log() { + let p = TestScratch::new_dir("log-message-collector-path_for_log"); + let worker = make_worker(p.path()); + + let path = worker + .path_for_log(&LogFrom { attempt_id: String::from("my-attempt-id"), routing_key: String::from("my-routing-key"), }) @@ -211,11 +262,11 @@ mod tests { } #[test] - fn test_path_for_malicious() { + fn test_path_for_log_malicious() { let p = TestScratch::new_dir("log-message-collector-for_malicious"); let worker = make_worker(p.path()); - let path = worker.path_for(&LogFrom { + let path = worker.path_for_log(&LogFrom { attempt_id: String::from("./../../"), routing_key: String::from("./../../foobar"), }); @@ -242,52 +293,75 @@ mod tests { #[test] - fn test_open_log() { - let p = TestScratch::new_dir("log-message-collector-open_log"); + fn test_open_file() { + let p = TestScratch::new_dir("log-message-collector-open_file"); let worker = make_worker(p.path()); assert!( worker - .open_log(worker.path_for(&make_from("a")).unwrap()) + .open_file(worker.path_for_log(&make_from("a")).unwrap()) .is_ok() ); assert!( worker - .open_log(worker.path_for(&make_from("b.foo/123")).unwrap()) + .open_file(worker.path_for_log(&make_from("b.foo/123")).unwrap()) .is_ok() ); } #[test] pub fn test_logs_collect() { + let mut logmsg = BuildLogMsg { + attempt_id: String::from("my-attempt-id"), + identity: String::from("my-identity"), + system: String::from("foobar-x8664"), + line_number: 1, + output: String::from("line-1"), + }; let mut job = LogMessage { from: make_from("foo"), - message: BuildLogMsg { - attempt_id: String::from("my-attempt-id"), - identity: String::from("my-identity"), - system: String::from("foobar-x8664"), - line_number: 1, - output: String::from("line-1"), - }, + message: Right(logmsg.clone()), }; - let p = TestScratch::new_dir("log-message-collector-path_for"); + let p = TestScratch::new_dir("log-message-collector-path_for_log"); { let mut worker = make_worker(p.path()); + assert_eq!(vec![worker::Action::Ack], + worker.consumer(& + LogMessage { + from: make_from("foo"), + message: Left(BuildLogStart { + attempt_id: String::from("my-attempt-id"), + identity: String::from("my-identity"), + system: String::from("foobar-x8664"), + }) + } + ) + ); + assert_eq!(vec![worker::Action::Ack], worker.consumer(&job)); - job.message.line_number = 5; - job.message.output = String::from("line-5"); + logmsg.line_number = 5; + logmsg.output = String::from("line-5"); + job.message = Right(logmsg.clone()); assert_eq!(vec![worker::Action::Ack], worker.consumer(&job)); job.from.attempt_id = String::from("my-other-attempt"); - job.message.attempt_id = String::from("my-other-attempt"); - job.message.line_number = 3; - job.message.output = String::from("line-3"); + logmsg.attempt_id = String::from("my-other-attempt"); + logmsg.line_number = 3; + logmsg.output = String::from("line-3"); + job.message = Right(logmsg.clone()); assert_eq!(vec![worker::Action::Ack], worker.consumer(&job)); } + let mut pr = p.path(); + let mut s = String::new(); + pr.push("routing-key-foo/attempt-id-foo.metadata.json"); + File::open(pr).unwrap().read_to_string(&mut s).unwrap(); + assert_eq!(&s, "{\"system\":\"foobar-x8664\",\"identity\":\"my-identity\",\"attempt_id\":\"my-attempt-id\"}"); + + let mut pr = p.path(); let mut s = String::new(); pr.push("routing-key-foo/attempt-id-foo");