Store metadata with logs

This commit is contained in:
Graham Christensen 2018-02-21 21:36:27 -05:00
parent 918099b196
commit aba3ad04e5
No known key found for this signature in database
GPG key ID: ACA1C1D120C83D5C
6 changed files with 138 additions and 37 deletions

View file

@ -182,6 +182,13 @@ rec {
sha256 = "1bxsh6fags7nr36vlz07ik2a1rzyipc8x1y30kjk832hf2pzadmw"; sha256 = "1bxsh6fags7nr36vlz07ik2a1rzyipc8x1y30kjk832hf2pzadmw";
inherit dependencies buildDependencies features; inherit dependencies buildDependencies features;
}; };
either_1_4_0_ = { dependencies?[], buildDependencies?[], features?[] }: buildRustCrate {
crateName = "either";
version = "1.4.0";
authors = [ "bluss" ];
sha256 = "04kpfd84lvyrkb2z4sljlz2d3d5qczd0sb1yy37fgijq2yx3vb37";
inherit dependencies buildDependencies features;
};
enum_primitive_0_1_1_ = { dependencies?[], buildDependencies?[], features?[] }: buildRustCrate { enum_primitive_0_1_1_ = { dependencies?[], buildDependencies?[], features?[] }: buildRustCrate {
crateName = "enum_primitive"; crateName = "enum_primitive";
version = "0.1.1"; version = "0.1.1";
@ -974,6 +981,17 @@ rec {
dtoa_0_4_2_features = f: updateFeatures f (rec { dtoa_0_4_2_features = f: updateFeatures f (rec {
dtoa_0_4_2.default = (f.dtoa_0_4_2.default or true); dtoa_0_4_2.default = (f.dtoa_0_4_2.default or true);
}) []; }) [];
either_1_4_0 = { features?(either_1_4_0_features {}) }: either_1_4_0_ {
dependencies = mapFeatures features ([]);
features = mkFeatures (features.either_1_4_0 or {});
};
either_1_4_0_features = f: updateFeatures f (rec {
either_1_4_0.default = (f.either_1_4_0.default or true);
either_1_4_0.use_std =
(f.either_1_4_0.use_std or false) ||
(f.either_1_4_0.default or false) ||
(either_1_4_0.default or false);
}) [];
enum_primitive_0_1_1 = { features?(enum_primitive_0_1_1_features {}) }: enum_primitive_0_1_1_ { enum_primitive_0_1_1 = { features?(enum_primitive_0_1_1_features {}) }: enum_primitive_0_1_1_ {
dependencies = mapFeatures features ([ num_traits_0_1_41 ]); dependencies = mapFeatures features ([ num_traits_0_1_41 ]);
}; };
@ -1325,10 +1343,11 @@ rec {
num_cpus_1_8_0.default = (f.num_cpus_1_8_0.default or true); num_cpus_1_8_0.default = (f.num_cpus_1_8_0.default or true);
}) [ libc_0_2_36_features ]; }) [ libc_0_2_36_features ];
ofborg_0_1_1 = { features?(ofborg_0_1_1_features {}) }: ofborg_0_1_1_ { ofborg_0_1_1 = { features?(ofborg_0_1_1_features {}) }: ofborg_0_1_1_ {
dependencies = mapFeatures features ([ amqp_0_1_0 env_logger_0_4_3 fs2_0_4_3 hubcaps_0_3_16 hyper_0_10_13 hyper_native_tls_0_2_4 log_0_3_8 lru_cache_0_1_1 md5_0_3_6 serde_1_0_27 serde_derive_1_0_27 serde_json_1_0_9 tempfile_2_2_0 uuid_0_4_0 ]); dependencies = mapFeatures features ([ amqp_0_1_0 either_1_4_0 env_logger_0_4_3 fs2_0_4_3 hubcaps_0_3_16 hyper_0_10_13 hyper_native_tls_0_2_4 log_0_3_8 lru_cache_0_1_1 md5_0_3_6 serde_1_0_27 serde_derive_1_0_27 serde_json_1_0_9 tempfile_2_2_0 uuid_0_4_0 ]);
}; };
ofborg_0_1_1_features = f: updateFeatures f (rec { ofborg_0_1_1_features = f: updateFeatures f (rec {
amqp_0_1_0.default = true; amqp_0_1_0.default = true;
either_1_4_0.default = true;
env_logger_0_4_3.default = true; env_logger_0_4_3.default = true;
fs2_0_4_3.default = true; fs2_0_4_3.default = true;
hubcaps_0_3_16.default = true; hubcaps_0_3_16.default = true;
@ -1344,7 +1363,7 @@ rec {
tempfile_2_2_0.default = true; tempfile_2_2_0.default = true;
uuid_0_4_0.default = true; uuid_0_4_0.default = true;
uuid_0_4_0.v4 = true; uuid_0_4_0.v4 = true;
}) [ amqp_0_1_0_features env_logger_0_4_3_features fs2_0_4_3_features hubcaps_0_3_16_features hyper_0_10_13_features hyper_native_tls_0_2_4_features log_0_3_8_features lru_cache_0_1_1_features md5_0_3_6_features serde_1_0_27_features serde_derive_1_0_27_features serde_json_1_0_9_features tempfile_2_2_0_features uuid_0_4_0_features ]; }) [ amqp_0_1_0_features either_1_4_0_features env_logger_0_4_3_features fs2_0_4_3_features hubcaps_0_3_16_features hyper_0_10_13_features hyper_native_tls_0_2_4_features log_0_3_8_features lru_cache_0_1_1_features md5_0_3_6_features serde_1_0_27_features serde_derive_1_0_27_features serde_json_1_0_9_features tempfile_2_2_0_features uuid_0_4_0_features ];
openssl_0_9_23 = { features?(openssl_0_9_23_features {}) }: openssl_0_9_23_ { openssl_0_9_23 = { features?(openssl_0_9_23_features {}) }: openssl_0_9_23_ {
dependencies = mapFeatures features ([ bitflags_0_9_1 foreign_types_0_3_2 lazy_static_1_0_0 libc_0_2_36 openssl_sys_0_9_24 ]); dependencies = mapFeatures features ([ bitflags_0_9_1 foreign_types_0_3_2 lazy_static_1_0_0 libc_0_2_36 openssl_sys_0_9_24 ]);
features = mkFeatures (features.openssl_0_9_23 or {}); features = mkFeatures (features.openssl_0_9_23 or {});

7
ofborg/Cargo.lock generated
View file

@ -132,6 +132,11 @@ name = "dtoa"
version = "0.4.2" version = "0.4.2"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
[[package]]
name = "either"
version = "1.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
[[package]] [[package]]
name = "enum_primitive" name = "enum_primitive"
version = "0.1.1" version = "0.1.1"
@ -377,6 +382,7 @@ name = "ofborg"
version = "0.1.1" version = "0.1.1"
dependencies = [ dependencies = [
"amqp 0.1.0 (git+https://github.com/grahamc/rust-amqp.git)", "amqp 0.1.0 (git+https://github.com/grahamc/rust-amqp.git)",
"either 1.4.0 (registry+https://github.com/rust-lang/crates.io-index)",
"env_logger 0.4.3 (registry+https://github.com/rust-lang/crates.io-index)", "env_logger 0.4.3 (registry+https://github.com/rust-lang/crates.io-index)",
"fs2 0.4.3 (registry+https://github.com/rust-lang/crates.io-index)", "fs2 0.4.3 (registry+https://github.com/rust-lang/crates.io-index)",
"hubcaps 0.3.16 (git+https://github.com/grahamc/hubcaps.git)", "hubcaps 0.3.16 (git+https://github.com/grahamc/hubcaps.git)",
@ -761,6 +767,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
"checksum core-foundation 0.2.3 (registry+https://github.com/rust-lang/crates.io-index)" = "25bfd746d203017f7d5cbd31ee5d8e17f94b6521c7af77ece6c9e4b2d4b16c67" "checksum core-foundation 0.2.3 (registry+https://github.com/rust-lang/crates.io-index)" = "25bfd746d203017f7d5cbd31ee5d8e17f94b6521c7af77ece6c9e4b2d4b16c67"
"checksum core-foundation-sys 0.2.3 (registry+https://github.com/rust-lang/crates.io-index)" = "065a5d7ffdcbc8fa145d6f0746f3555025b9097a9e9cda59f7467abae670c78d" "checksum core-foundation-sys 0.2.3 (registry+https://github.com/rust-lang/crates.io-index)" = "065a5d7ffdcbc8fa145d6f0746f3555025b9097a9e9cda59f7467abae670c78d"
"checksum dtoa 0.4.2 (registry+https://github.com/rust-lang/crates.io-index)" = "09c3753c3db574d215cba4ea76018483895d7bff25a31b49ba45db21c48e50ab" "checksum dtoa 0.4.2 (registry+https://github.com/rust-lang/crates.io-index)" = "09c3753c3db574d215cba4ea76018483895d7bff25a31b49ba45db21c48e50ab"
"checksum either 1.4.0 (registry+https://github.com/rust-lang/crates.io-index)" = "740178ddf48b1a9e878e6d6509a1442a2d42fd2928aae8e7a6f8a36fb01981b3"
"checksum enum_primitive 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)" = "be4551092f4d519593039259a9ed8daedf0da12e5109c5280338073eaeb81180" "checksum enum_primitive 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)" = "be4551092f4d519593039259a9ed8daedf0da12e5109c5280338073eaeb81180"
"checksum env_logger 0.3.5 (registry+https://github.com/rust-lang/crates.io-index)" = "15abd780e45b3ea4f76b4e9a26ff4843258dd8a3eed2775a0e7368c2e7936c2f" "checksum env_logger 0.3.5 (registry+https://github.com/rust-lang/crates.io-index)" = "15abd780e45b3ea4f76b4e9a26ff4843258dd8a3eed2775a0e7368c2e7936c2f"
"checksum env_logger 0.4.3 (registry+https://github.com/rust-lang/crates.io-index)" = "3ddf21e73e016298f5cb37d6ef8e8da8e39f91f9ec8b0df44b7deb16a9f8cd5b" "checksum env_logger 0.4.3 (registry+https://github.com/rust-lang/crates.io-index)" = "3ddf21e73e016298f5cb37d6ef8e8da8e39f91f9ec8b0df44b7deb16a9f8cd5b"

View file

@ -7,6 +7,7 @@ build = "build.rs"
[dependencies] [dependencies]
either = "1.4.0"
log = "= 0.3.8" log = "= 0.3.8"
env_logger = "= 0.4.3" env_logger = "= 0.4.3"
# amqp = { path = "./rust-amqp/" } # for testing patches # amqp = { path = "./rust-amqp/" } # for testing patches

View file

@ -9,7 +9,7 @@ extern crate log;
extern crate hubcaps; extern crate hubcaps;
extern crate hyper; extern crate hyper;
extern crate hyper_native_tls; extern crate hyper_native_tls;
extern crate either;
extern crate lru_cache; extern crate lru_cache;
extern crate tempfile; extern crate tempfile;
extern crate amqp; extern crate amqp;

View file

@ -1,5 +1,5 @@
#[derive(Serialize, Deserialize, Debug)] #[derive(Serialize, Deserialize, Debug, Clone)]
pub struct BuildLogMsg { pub struct BuildLogMsg {
pub system: String, pub system: String,
pub identity: String, pub identity: String,
@ -8,7 +8,7 @@ pub struct BuildLogMsg {
pub output: String, pub output: String,
} }
#[derive(Serialize, Deserialize, Debug)] #[derive(Serialize, Deserialize, Debug, Clone)]
pub struct BuildLogStart { pub struct BuildLogStart {
pub system: String, pub system: String,
pub identity: String, pub identity: String,

View file

@ -1,14 +1,16 @@
extern crate amqp; extern crate amqp;
extern crate env_logger; extern crate env_logger;
use either::{Either, Left, Right};
use lru_cache::LruCache; use lru_cache::LruCache;
use serde_json; use serde_json;
use std::fs; use std::fs;
use std::fs::{OpenOptions, File}; use std::fs::{OpenOptions, File};
use std::path::{Component, PathBuf}; use std::path::{Component, PathBuf};
use std::io::Write;
use ofborg::writetoline::LineWriter; use ofborg::writetoline::LineWriter;
use ofborg::message::buildlogmsg::BuildLogMsg; use ofborg::message::buildlogmsg::{BuildLogStart, BuildLogMsg};
use ofborg::worker; use ofborg::worker;
use amqp::protocol::basic::{Deliver, BasicProperties}; use amqp::protocol::basic::{Deliver, BasicProperties};
@ -26,7 +28,7 @@ pub struct LogMessageCollector {
#[derive(Debug)] #[derive(Debug)]
pub struct LogMessage { pub struct LogMessage {
from: LogFrom, from: LogFrom,
message: BuildLogMsg, message: Either<BuildLogStart, BuildLogMsg>,
} }
fn validate_path_segment(segment: &PathBuf) -> Result<(), String> { fn validate_path_segment(segment: &PathBuf) -> Result<(), String> {
@ -58,14 +60,32 @@ impl LogMessageCollector {
}; };
} }
pub fn write_metadata(&mut self, from: &LogFrom, data: &BuildLogStart) -> Result<(), String>{
let metapath = self.path_for_metadata(&from)?;
let mut fp = self.open_file(metapath)?;
match serde_json::to_string(data) {
Ok(data) => {
if let Err(e) = fp.write(&data.as_bytes()) {
Err(format!("Failed to write metadata: {:?}", e))
} else {
Ok(())
}
},
Err(e) => {
Err(format!("Failed to stringify metadata: {:?}", e))
}
}
}
pub fn handle_for(&mut self, from: &LogFrom) -> Result<&mut LineWriter, String> { pub fn handle_for(&mut self, from: &LogFrom) -> Result<&mut LineWriter, String> {
if self.handles.contains_key(&from) { if self.handles.contains_key(&from) {
return Ok(self.handles.get_mut(&from).expect( return Ok(self.handles.get_mut(&from).expect(
"handles just contained the key", "handles just contained the key",
)); ));
} else { } else {
let logpath = self.path_for(&from)?; let logpath = self.path_for_log(&from)?;
let fp = self.open_log(logpath)?; let fp = self.open_file(logpath)?;
let writer = LineWriter::new(fp); let writer = LineWriter::new(fp);
self.handles.insert(from.clone(), writer); self.handles.insert(from.clone(), writer);
if let Some(handle) = self.handles.get_mut(&from) { if let Some(handle) = self.handles.get_mut(&from) {
@ -78,7 +98,13 @@ impl LogMessageCollector {
} }
} }
fn path_for(&self, from: &LogFrom) -> Result<PathBuf, String> { fn path_for_metadata(&self, from: &LogFrom) -> Result<PathBuf, String> {
let mut path = self.path_for_log(from)?;
path.set_extension("metadata.json");
return Ok(path);
}
fn path_for_log(&self, from: &LogFrom) -> Result<PathBuf, String> {
let mut location = self.log_root.clone(); let mut location = self.log_root.clone();
let routing_key = PathBuf::from(from.routing_key.clone()); let routing_key = PathBuf::from(from.routing_key.clone());
@ -100,7 +126,7 @@ impl LogMessageCollector {
} }
} }
fn open_log(&self, path: PathBuf) -> Result<File, String> { fn open_file(&self, path: PathBuf) -> Result<File, String> {
let dir = path.parent().unwrap(); let dir = path.parent().unwrap();
fs::create_dir_all(dir).unwrap(); fs::create_dir_all(dir).unwrap();
@ -114,7 +140,7 @@ impl LogMessageCollector {
match attempt { match attempt {
Ok(handle) => Ok(handle), Ok(handle) => Ok(handle),
Err(e) => Err(format!( Err(e) => Err(format!(
"Failed to open the log file for {:?}, err: {:?}", "Failed to open the file for {:?}, err: {:?}",
&path, &path,
e e
)), )),
@ -144,14 +170,22 @@ impl worker::SimpleWorker for LogMessageCollector {
routing_key: deliver.routing_key.clone(), routing_key: deliver.routing_key.clone(),
attempt_id: message.attempt_id.clone(), attempt_id: message.attempt_id.clone(),
}, },
message: message, message: Right(message),
}) })
} }
fn consumer(&mut self, job: &LogMessage) -> worker::Actions { fn consumer(&mut self, job: &LogMessage) -> worker::Actions {
let handle = self.handle_for(&job.from).unwrap(); match job.message {
Left(ref start) => {
self.write_metadata(&job.from, &start).expect("failed to write metadata");
},
Right(ref message) => {
let handle = self.handle_for(&job.from).unwrap();
handle.write_to_line((job.message.line_number - 1) as usize, &job.message.output); handle.write_to_line((message.line_number - 1) as usize,
&message.output);
}
}
return vec![worker::Action::Ack]; return vec![worker::Action::Ack];
} }
@ -194,12 +228,29 @@ mod tests {
} }
#[test] #[test]
fn test_path_for() { fn test_path_for_metadata() {
let p = TestScratch::new_dir("log-message-collector-path_for"); let p = TestScratch::new_dir("log-message-collector-path_for_metadata");
let worker = make_worker(p.path()); let worker = make_worker(p.path());
let path = worker let path = worker
.path_for(&LogFrom { .path_for_metadata(&LogFrom {
attempt_id: String::from("my-attempt-id"),
routing_key: String::from("my-routing-key"),
})
.expect("the path should be valid");
assert!(path.starts_with(p.path()));
assert!(path.as_os_str().to_string_lossy().ends_with("my-routing-key/my-attempt-id.metadata.json"));
}
#[test]
fn test_path_for_log() {
let p = TestScratch::new_dir("log-message-collector-path_for_log");
let worker = make_worker(p.path());
let path = worker
.path_for_log(&LogFrom {
attempt_id: String::from("my-attempt-id"), attempt_id: String::from("my-attempt-id"),
routing_key: String::from("my-routing-key"), routing_key: String::from("my-routing-key"),
}) })
@ -211,11 +262,11 @@ mod tests {
} }
#[test] #[test]
fn test_path_for_malicious() { fn test_path_for_log_malicious() {
let p = TestScratch::new_dir("log-message-collector-for_malicious"); let p = TestScratch::new_dir("log-message-collector-for_malicious");
let worker = make_worker(p.path()); let worker = make_worker(p.path());
let path = worker.path_for(&LogFrom { let path = worker.path_for_log(&LogFrom {
attempt_id: String::from("./../../"), attempt_id: String::from("./../../"),
routing_key: String::from("./../../foobar"), routing_key: String::from("./../../foobar"),
}); });
@ -242,52 +293,75 @@ mod tests {
#[test] #[test]
fn test_open_log() { fn test_open_file() {
let p = TestScratch::new_dir("log-message-collector-open_log"); let p = TestScratch::new_dir("log-message-collector-open_file");
let worker = make_worker(p.path()); let worker = make_worker(p.path());
assert!( assert!(
worker worker
.open_log(worker.path_for(&make_from("a")).unwrap()) .open_file(worker.path_for_log(&make_from("a")).unwrap())
.is_ok() .is_ok()
); );
assert!( assert!(
worker worker
.open_log(worker.path_for(&make_from("b.foo/123")).unwrap()) .open_file(worker.path_for_log(&make_from("b.foo/123")).unwrap())
.is_ok() .is_ok()
); );
} }
#[test] #[test]
pub fn test_logs_collect() { pub fn test_logs_collect() {
let mut logmsg = BuildLogMsg {
attempt_id: String::from("my-attempt-id"),
identity: String::from("my-identity"),
system: String::from("foobar-x8664"),
line_number: 1,
output: String::from("line-1"),
};
let mut job = LogMessage { let mut job = LogMessage {
from: make_from("foo"), from: make_from("foo"),
message: BuildLogMsg { message: Right(logmsg.clone()),
attempt_id: String::from("my-attempt-id"),
identity: String::from("my-identity"),
system: String::from("foobar-x8664"),
line_number: 1,
output: String::from("line-1"),
},
}; };
let p = TestScratch::new_dir("log-message-collector-path_for"); let p = TestScratch::new_dir("log-message-collector-path_for_log");
{ {
let mut worker = make_worker(p.path()); let mut worker = make_worker(p.path());
assert_eq!(vec![worker::Action::Ack],
worker.consumer(&
LogMessage {
from: make_from("foo"),
message: Left(BuildLogStart {
attempt_id: String::from("my-attempt-id"),
identity: String::from("my-identity"),
system: String::from("foobar-x8664"),
})
}
)
);
assert_eq!(vec![worker::Action::Ack], worker.consumer(&job)); assert_eq!(vec![worker::Action::Ack], worker.consumer(&job));
job.message.line_number = 5; logmsg.line_number = 5;
job.message.output = String::from("line-5"); logmsg.output = String::from("line-5");
job.message = Right(logmsg.clone());
assert_eq!(vec![worker::Action::Ack], worker.consumer(&job)); assert_eq!(vec![worker::Action::Ack], worker.consumer(&job));
job.from.attempt_id = String::from("my-other-attempt"); job.from.attempt_id = String::from("my-other-attempt");
job.message.attempt_id = String::from("my-other-attempt"); logmsg.attempt_id = String::from("my-other-attempt");
job.message.line_number = 3; logmsg.line_number = 3;
job.message.output = String::from("line-3"); logmsg.output = String::from("line-3");
job.message = Right(logmsg.clone());
assert_eq!(vec![worker::Action::Ack], worker.consumer(&job)); assert_eq!(vec![worker::Action::Ack], worker.consumer(&job));
} }
let mut pr = p.path();
let mut s = String::new();
pr.push("routing-key-foo/attempt-id-foo.metadata.json");
File::open(pr).unwrap().read_to_string(&mut s).unwrap();
assert_eq!(&s, "{\"system\":\"foobar-x8664\",\"identity\":\"my-identity\",\"attempt_id\":\"my-attempt-id\"}");
let mut pr = p.path(); let mut pr = p.path();
let mut s = String::new(); let mut s = String::new();
pr.push("routing-key-foo/attempt-id-foo"); pr.push("routing-key-foo/attempt-id-foo");