From fd54190f00e8b6d19ba1edd4026fa2c99d45aa05 Mon Sep 17 00:00:00 2001 From: Graham Christensen Date: Thu, 8 Feb 2018 10:04:40 -0500 Subject: [PATCH] Initial stats work --- ofborg/Cargo.lock | 4 +- ofborg/Cargo.toml | 3 +- ofborg/src/bin/mass-rebuilder.rs | 5 +- ofborg/src/bin/stats.rs | 83 ++++++++++ ofborg/src/lib.rs | 4 + ofborg/src/stats.rs | 35 ++++- ofborg/src/tasks/massrebuilder.rs | 13 +- ofborg/src/tasks/mod.rs | 1 + ofborg/src/tasks/statscollector.rs | 241 +++++++++++++++++++++++++++++ ofborg/src/worker.rs | 4 +- 10 files changed, 374 insertions(+), 19 deletions(-) create mode 100644 ofborg/src/bin/stats.rs create mode 100644 ofborg/src/tasks/statscollector.rs diff --git a/ofborg/Cargo.lock b/ofborg/Cargo.lock index 092422c..56a1a46 100644 --- a/ofborg/Cargo.lock +++ b/ofborg/Cargo.lock @@ -390,7 +390,7 @@ dependencies = [ "log 0.3.8 (registry+https://github.com/rust-lang/crates.io-index)", "lru-cache 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)", "md5 0.3.6 (registry+https://github.com/rust-lang/crates.io-index)", - "prometheus 0.3.11 (registry+https://github.com/rust-lang/crates.io-index)", + "prometheus 0.3.11", "serde 1.0.27 (registry+https://github.com/rust-lang/crates.io-index)", "serde_derive 1.0.27 (registry+https://github.com/rust-lang/crates.io-index)", "serde_json 1.0.9 (registry+https://github.com/rust-lang/crates.io-index)", @@ -434,7 +434,6 @@ source = "registry+https://github.com/rust-lang/crates.io-index" [[package]] name = "prometheus" version = "0.3.11" -source = "registry+https://github.com/rust-lang/crates.io-index" dependencies = [ "cfg-if 0.1.2 (registry+https://github.com/rust-lang/crates.io-index)", "fnv 1.0.6 (registry+https://github.com/rust-lang/crates.io-index)", @@ -831,7 +830,6 @@ source = "registry+https://github.com/rust-lang/crates.io-index" "checksum openssl-sys 0.9.24 (registry+https://github.com/rust-lang/crates.io-index)" = "14ba54ac7d5a4eabd1d5f2c1fdeb7e7c14debfa669d94b983d01b465e767ba9e" "checksum percent-encoding 1.0.1 (registry+https://github.com/rust-lang/crates.io-index)" = "31010dd2e1ac33d5b46a5b413495239882813e0369f8ed8a5e266f173602f831" "checksum pkg-config 0.3.9 (registry+https://github.com/rust-lang/crates.io-index)" = "3a8b4c6b8165cd1a1cd4b9b120978131389f64bdaf456435caa41e630edba903" -"checksum prometheus 0.3.11 (registry+https://github.com/rust-lang/crates.io-index)" = "71a963c5a59b4459a8133e60f8170df6fd29b67c0e6de5d45521d3056465bbfc" "checksum protobuf 1.4.3 (registry+https://github.com/rust-lang/crates.io-index)" = "bec26e67194b7d991908145fdf21b7cae8b08423d96dcb9e860cd31f854b9506" "checksum quick-error 0.2.2 (registry+https://github.com/rust-lang/crates.io-index)" = "7ac990ab4e038dd8481a5e3fd00641067fcfc674ad663f3222752ed5284e05d4" "checksum quote 0.3.15 (registry+https://github.com/rust-lang/crates.io-index)" = "7a6e920b65c65f10b2ae65c831a81a073a89edd28c7cce89475bff467ab4167a" diff --git a/ofborg/Cargo.toml b/ofborg/Cargo.toml index 2966039..9fa8429 100644 --- a/ofborg/Cargo.toml +++ b/ofborg/Cargo.toml @@ -23,7 +23,8 @@ hubcaps = { git = "https://github.com/grahamc/hubcaps.git" } hyper = "0.10.*" hyper-native-tls = "0.2.4" lru-cache = "0.1.1" -prometheus = "0.3.11" +# prometheus = "0.3.11" +prometheus = { path = "../rust-prometheus/" } # for testing patches #[patch.crates-io] diff --git a/ofborg/src/bin/mass-rebuilder.rs b/ofborg/src/bin/mass-rebuilder.rs index 8f0a5a5..d98d870 100644 --- a/ofborg/src/bin/mass-rebuilder.rs +++ b/ofborg/src/bin/mass-rebuilder.rs @@ -30,7 +30,10 @@ fn main() { let cloner = checkout::cached_cloner(Path::new(&cfg.checkout.root)); let nix = cfg.nix(); - let events = stats::RabbitMQ::new(session.open_channel(3).unwrap()); + let events = stats::RabbitMQ::new( + &format!("{}-{}", cfg.runner.identity.clone(), cfg.nix.system.clone()), + session.open_channel(3).unwrap() + ); let mrw = tasks::massrebuilder::MassRebuildWorker::new( cloner, diff --git a/ofborg/src/bin/stats.rs b/ofborg/src/bin/stats.rs new file mode 100644 index 0000000..257e819 --- /dev/null +++ b/ofborg/src/bin/stats.rs @@ -0,0 +1,83 @@ +extern crate hyper; +extern crate prometheus; +extern crate amqp; +extern crate ofborg; + +use std::env; +use ofborg::{easyamqp, tasks, worker, config, stats}; + +use amqp::Basic; +use ofborg::easyamqp::TypedWrappers; +use hyper::header::ContentType; +use hyper::mime::Mime; +use hyper::server::{Request, Response, Server}; +use prometheus::{Counter, Encoder, Gauge, HistogramVec, TextEncoder}; + +use std::thread; +use std::time::Duration; + +fn main() { + let cfg = config::load(env::args().nth(1).unwrap().as_ref()); + ofborg::setup_log(); + + println!("Hello, world!"); + + + let mut session = easyamqp::session_from_config(&cfg.rabbitmq).unwrap(); + println!("Connected to rabbitmq"); + + let events = stats::RabbitMQ::new( + &format!("{}-{}", cfg.runner.identity.clone(), cfg.nix.system.clone()), + session.open_channel(3).unwrap() + ); + + let collector = tasks::statscollector::StatCollectorWorker::new( + events + ); + + let mut channel = session.open_channel(1).unwrap(); + + channel.basic_prefetch(1).unwrap(); + channel + .consume( + worker::new(collector), + easyamqp::ConsumeConfig { + queue: "sample-stats-events".to_owned(), + consumer_tag: format!("{}-prometheus-stats-collector", cfg.whoami()), + no_local: false, + no_ack: false, + no_wait: false, + exclusive: false, + arguments: None, + }, + ) + .unwrap(); + + + thread::spawn(||{ + let encoder = TextEncoder::new(); + let addr = "127.0.0.1:9898"; + println!("listening addr {:?}", addr); + Server::http(addr) + .unwrap() + .handle(move |_: Request, mut res: Response| { + let metric_familys = prometheus::gather(); + let mut buffer = vec![]; + encoder.encode(&metric_familys, &mut buffer).unwrap(); + res.headers_mut() + .set(ContentType(encoder.format_type().parse::().unwrap())); + res.send(&buffer).unwrap(); + }) + .unwrap(); + }); + + + channel.start_consuming(); + + println!("Finished consuming?"); + + channel.close(200, "Bye").unwrap(); + println!("Closed the channel"); + session.close(200, "Good Bye"); + println!("Closed the session... EOF"); +} diff --git a/ofborg/src/lib.rs b/ofborg/src/lib.rs index bda70db..12f3430 100644 --- a/ofborg/src/lib.rs +++ b/ofborg/src/lib.rs @@ -1,3 +1,7 @@ +#[macro_use] +extern crate prometheus; + + #[macro_use] extern crate serde_derive; extern crate serde; diff --git a/ofborg/src/stats.rs b/ofborg/src/stats.rs index 87c8dc0..3c779a4 100644 --- a/ofborg/src/stats.rs +++ b/ofborg/src/stats.rs @@ -1,23 +1,43 @@ +use serde_json; use amqp::Channel; use amqp::protocol::basic::BasicProperties; use amqp::Basic; -pub trait SysEvents { - fn tick(&mut self, name: &str); +pub trait SysEvents: Send { + fn notify(&mut self, event: Event); +} + +#[derive(Serialize, Deserialize, Debug)] +#[serde(rename_all="kebab-case")] +pub enum Event { + StatCollectorLegacyEvent, + StatCollectorBogusEvent, + JobReceived, + JobDecodeSuccess, + JobDecodeFailure, + IssueAlreadyClosed, + IssueFetchFailed, +} + +#[derive(Serialize, Deserialize, Debug)] +pub struct EventMessage { + pub sender: String, + pub events: Vec, } pub struct RabbitMQ { + identity: String, channel: Channel, } impl RabbitMQ { - pub fn new(channel: Channel) -> RabbitMQ { - RabbitMQ { channel: channel } + pub fn new(identity: &str, channel: Channel) -> RabbitMQ { + RabbitMQ { identity: identity.to_owned(), channel: channel } } } impl SysEvents for RabbitMQ { - fn tick(&mut self, name: &str) { + fn notify(&mut self, event: Event) { let props = BasicProperties { ..Default::default() }; self.channel .basic_publish( @@ -26,7 +46,10 @@ impl SysEvents for RabbitMQ { false, false, props, - String::from(name).into_bytes(), + serde_json::to_string(&EventMessage { + sender: self.identity.clone(), + events: vec![event], + }).unwrap().into_bytes(), ) .unwrap(); } diff --git a/ofborg/src/tasks/massrebuilder.rs b/ofborg/src/tasks/massrebuilder.rs index 155143b..0266bdf 100644 --- a/ofborg/src/tasks/massrebuilder.rs +++ b/ofborg/src/tasks/massrebuilder.rs @@ -14,6 +14,7 @@ use ofborg::nix::Nix; use ofborg::acl::ACL; use ofborg::stats; +use ofborg::stats::Event; use ofborg::worker; use ofborg::tagger::{StdenvTagger, RebuildTagger, PathsTagger, PkgsAddedRemovedTagger}; use ofborg::outpathdiff::{OutPaths, OutPathDiff}; @@ -73,7 +74,7 @@ impl MassRebuildWorker { } } -impl worker::SimpleWorker for MassRebuildWorker { +impl worker::SimpleWorker for MassRebuildWorker { type J = massrebuildjob::MassRebuildJob; fn msg_to_job( @@ -82,14 +83,14 @@ impl worker::SimpleWorker for MassRebuildWorker { _: &BasicProperties, body: &Vec, ) -> Result { - self.events.tick("job-received"); + self.events.notify(Event::JobReceived); return match massrebuildjob::from(body) { Ok(e) => { - self.events.tick("job-decode-success"); + self.events.notify(Event::JobDecodeSuccess); Ok(e) } Err(e) => { - self.events.tick("job-decode-failure"); + self.events.notify(Event::JobDecodeFailure); error!( "Failed to decode message: {:?}, Err: {:?}", String::from_utf8(body.clone()), @@ -113,7 +114,7 @@ impl worker::SimpleWorker for MassRebuildWorker { match issue.get() { Ok(iss) => { if iss.state == "closed" { - self.events.tick("issue-already-closed"); + self.events.notify(Event::IssueAlreadyClosed); info!("Skipping {} because it is closed", job.pr.number); return self.actions().skip(&job); } @@ -128,7 +129,7 @@ impl worker::SimpleWorker for MassRebuildWorker { } } Err(e) => { - self.events.tick("issue-fetch-failed"); + self.events.notify(Event::IssueFetchFailed); info!("Error fetching {}!", job.pr.number); info!("E: {:?}", e); return self.actions().skip(&job); diff --git a/ofborg/src/tasks/mod.rs b/ofborg/src/tasks/mod.rs index d735e60..663bab6 100644 --- a/ofborg/src/tasks/mod.rs +++ b/ofborg/src/tasks/mod.rs @@ -3,4 +3,5 @@ pub mod build; pub mod massrebuilder; pub mod githubcommentfilter; pub mod githubcommentposter; +pub mod statscollector; pub mod log_message_collector; diff --git a/ofborg/src/tasks/statscollector.rs b/ofborg/src/tasks/statscollector.rs new file mode 100644 index 0000000..94b988c --- /dev/null +++ b/ofborg/src/tasks/statscollector.rs @@ -0,0 +1,241 @@ +extern crate prometheus; +extern crate amqp; +extern crate env_logger; + +use serde_json; +use std::str::FromStr; +use ofborg::worker; +use ofborg::stats; +use amqp::protocol::basic::{Deliver, BasicProperties}; +use std::collections::HashMap; +use std::mem; +use std::thread; +use std::time::Duration; +use std::sync::Arc; +use std::sync::Mutex; + +pub struct StatCollectorWorker { + events: E, + counter_collectors: HashMap, +} + +impl StatCollectorWorker { + pub fn new(events: E) -> StatCollectorWorker { + let mut worker = StatCollectorWorker { + events: events, + counter_collectors: HashMap::new(), + }; + + let initial_events: Vec = vec![ + stats::Event::StatCollectorLegacyEvent, + stats::Event::StatCollectorBogusEvent, + stats::Event::JobReceived, + stats::Event::JobDecodeSuccess, + stats::Event::JobDecodeFailure, + stats::Event::IssueAlreadyClosed, + stats::Event::IssueFetchFailed, + ]; + for initial_event in initial_events { + match initial_event { + // WARNING + // BEFORE YOU ADD A NEW VARIANT HERE, ADD IT + // TO THE LIST ABOVE! + // + // EACH VARIANT MUST BE INITIALIZED PRIOR + // TO REPORTING STATS + stats::Event::StatCollectorLegacyEvent => { + worker.register_counter( + &initial_event, + prometheus::Opts { + namespace: "ofborg".to_owned(), + subsystem: "stats_collector".to_owned(), + name: "legacy_event".to_owned(), + help: "Number of received legacy events".to_owned(), + const_labels: HashMap::new(), + variable_labels: vec!["instance".to_owned()], + } + ); + }, + stats::Event::StatCollectorBogusEvent => { + worker.register_counter( + &initial_event, + prometheus::Opts { + namespace: "ofborg".to_owned(), + subsystem: "stats_collector".to_owned(), + name: "bogus_event".to_owned(), + help: "Number of received unparseable events".to_owned(), + const_labels: HashMap::new(), + variable_labels: vec!["instance".to_owned()], + } + ); + }, + stats::Event::JobReceived => { + worker.register_counter( + &initial_event, + prometheus::Opts { + namespace: "ofborg".to_owned(), + subsystem: "generic_worker".to_owned(), + name: "job_received".to_owned(), + help: "Number of received worker jobs".to_owned(), + const_labels: HashMap::new(), + variable_labels: vec!["instance".to_owned()], + } + ); + }, + stats::Event::JobDecodeSuccess => { + worker.register_counter( + &initial_event, + prometheus::Opts { + namespace: "ofborg".to_owned(), + subsystem: "generic_worker".to_owned(), + name: "job_decode_successful".to_owned(), + help: "Number of successfully decoded jobs".to_owned(), + const_labels: HashMap::new(), + variable_labels: vec!["instance".to_owned()], + } + ); + }, + stats::Event::JobDecodeFailure => { + worker.register_counter( + &initial_event, + prometheus::Opts { + namespace: "ofborg".to_owned(), + subsystem: "generic_worker".to_owned(), + name: "job_decode_failure".to_owned(), + help: "Number of jobs which failed to parse".to_owned(), + const_labels: HashMap::new(), + variable_labels: vec!["instance".to_owned()], + } + ); + }, + stats::Event::IssueAlreadyClosed => { + worker.register_counter( + &initial_event, + prometheus::Opts { + namespace: "ofborg".to_owned(), + subsystem: "github".to_owned(), + name: "issue_closed".to_owned(), + help: "Number of jobs for issues which are already closed".to_owned(), + const_labels: HashMap::new(), + variable_labels: vec!["instance".to_owned()], + } + + ); + }, + stats::Event::IssueFetchFailed => { + worker.register_counter( + &initial_event, + prometheus::Opts { + namespace: "ofborg".to_owned(), + subsystem: "github".to_owned(), + name: "issue_fetch_fail".to_owned(), + help: "Number of failed fetches for GitHub issues".to_owned(), + const_labels: HashMap::new(), + variable_labels: vec!["instance".to_owned()], + } + ); + }, + }; + } + + return worker; + } + + pub fn counter(&self, event: &stats::Event) -> prometheus::CounterVec { + let disc = format!("{:?}", mem::discriminant(event)); + self.counter_collectors.get(&disc).unwrap().clone() + } + + pub fn register_counter( + &mut self, + event: &stats::Event, + opts: prometheus::Opts, + ) { + let disc = format!("{:?}", mem::discriminant(event)); + let orig_labels = opts.variable_labels.clone(); + let labels: Vec<&str> = orig_labels + .iter() + .map(|v| v.as_ref()) + .collect(); + + let counter = register_counter_vec!( + opts, labels.as_ref() + ).unwrap(); + counter.with_label_values(&[""]).inc_by(0.0); + + self.counter_collectors.insert( + disc, + counter + ); + } +} + +impl worker::SimpleWorker for StatCollectorWorker { + type J = stats::EventMessage; + + fn msg_to_job( + &mut self, + _: &Deliver, + _: &BasicProperties, + body: &Vec, + ) -> Result { + return match serde_json::from_slice(body) { + Ok(e) => Ok(e), + Err(_) => { + let mut modified_body: Vec = vec!["\"".as_bytes()[0]]; + modified_body.append(&mut body.clone()); + modified_body.push("\"".as_bytes()[0]); + + match serde_json::from_slice(&modified_body) { + Ok(e) => { + self.events.notify(stats::Event::StatCollectorLegacyEvent); + Ok(stats::EventMessage { + sender: "".to_owned(), + events: vec![e], + }) + }, + Err(e) => { + self.events.notify(stats::Event::StatCollectorBogusEvent); + error!( + "Failed to decode message: {:?}, Err: {:?}", + String::from_utf8(body.clone()), + e + ); + Err("Failed to decode message".to_owned()) + } + } + } + }; + } + + fn consumer(&mut self, job: &stats::EventMessage) -> worker::Actions { + let sender = job.sender.clone(); + for event in job.events.iter() { + match *event { + stats::Event::StatCollectorLegacyEvent => { + self.counter(&event).with_label_values(&[sender.as_ref()]).inc(); + }, + stats::Event::StatCollectorBogusEvent => { + self.counter(&event).with_label_values(&[sender.as_ref()]).inc(); + }, + stats::Event::JobReceived => { + self.counter(&event).with_label_values(&[sender.as_ref()]).inc(); + }, + stats::Event::JobDecodeSuccess => { + self.counter(&event).with_label_values(&[sender.as_ref()]).inc(); + }, + stats::Event::JobDecodeFailure => { + self.counter(&event).with_label_values(&[sender.as_ref()]).inc(); + }, + stats::Event::IssueAlreadyClosed => { + self.counter(&event).with_label_values(&[sender.as_ref()]).inc(); + }, + stats::Event::IssueFetchFailed => { + self.counter(&event).with_label_values(&[sender.as_ref()]).inc(); + }, + } + } + + return vec![worker::Action::Ack]; + } +} diff --git a/ofborg/src/worker.rs b/ofborg/src/worker.rs index 0b18e2c..62922ae 100644 --- a/ofborg/src/worker.rs +++ b/ofborg/src/worker.rs @@ -54,8 +54,8 @@ where }); } -pub trait SimpleWorker { - type J; +pub trait SimpleWorker: Send + 'static { + type J: Send; fn consumer(&mut self, job: &Self::J) -> Actions;