Initial stats work

This commit is contained in:
Graham Christensen 2018-02-08 10:04:40 -05:00
parent 7d16879d63
commit fd54190f00
No known key found for this signature in database
GPG key ID: ACA1C1D120C83D5C
10 changed files with 374 additions and 19 deletions

4
ofborg/Cargo.lock generated
View file

@ -390,7 +390,7 @@ dependencies = [
"log 0.3.8 (registry+https://github.com/rust-lang/crates.io-index)",
"lru-cache 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)",
"md5 0.3.6 (registry+https://github.com/rust-lang/crates.io-index)",
"prometheus 0.3.11 (registry+https://github.com/rust-lang/crates.io-index)",
"prometheus 0.3.11",
"serde 1.0.27 (registry+https://github.com/rust-lang/crates.io-index)",
"serde_derive 1.0.27 (registry+https://github.com/rust-lang/crates.io-index)",
"serde_json 1.0.9 (registry+https://github.com/rust-lang/crates.io-index)",
@ -434,7 +434,6 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
[[package]]
name = "prometheus"
version = "0.3.11"
source = "registry+https://github.com/rust-lang/crates.io-index"
dependencies = [
"cfg-if 0.1.2 (registry+https://github.com/rust-lang/crates.io-index)",
"fnv 1.0.6 (registry+https://github.com/rust-lang/crates.io-index)",
@ -831,7 +830,6 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
"checksum openssl-sys 0.9.24 (registry+https://github.com/rust-lang/crates.io-index)" = "14ba54ac7d5a4eabd1d5f2c1fdeb7e7c14debfa669d94b983d01b465e767ba9e"
"checksum percent-encoding 1.0.1 (registry+https://github.com/rust-lang/crates.io-index)" = "31010dd2e1ac33d5b46a5b413495239882813e0369f8ed8a5e266f173602f831"
"checksum pkg-config 0.3.9 (registry+https://github.com/rust-lang/crates.io-index)" = "3a8b4c6b8165cd1a1cd4b9b120978131389f64bdaf456435caa41e630edba903"
"checksum prometheus 0.3.11 (registry+https://github.com/rust-lang/crates.io-index)" = "71a963c5a59b4459a8133e60f8170df6fd29b67c0e6de5d45521d3056465bbfc"
"checksum protobuf 1.4.3 (registry+https://github.com/rust-lang/crates.io-index)" = "bec26e67194b7d991908145fdf21b7cae8b08423d96dcb9e860cd31f854b9506"
"checksum quick-error 0.2.2 (registry+https://github.com/rust-lang/crates.io-index)" = "7ac990ab4e038dd8481a5e3fd00641067fcfc674ad663f3222752ed5284e05d4"
"checksum quote 0.3.15 (registry+https://github.com/rust-lang/crates.io-index)" = "7a6e920b65c65f10b2ae65c831a81a073a89edd28c7cce89475bff467ab4167a"

View file

@ -23,7 +23,8 @@ hubcaps = { git = "https://github.com/grahamc/hubcaps.git" }
hyper = "0.10.*"
hyper-native-tls = "0.2.4"
lru-cache = "0.1.1"
prometheus = "0.3.11"
# prometheus = "0.3.11"
prometheus = { path = "../rust-prometheus/" } # for testing patches
#[patch.crates-io]

View file

@ -30,7 +30,10 @@ fn main() {
let cloner = checkout::cached_cloner(Path::new(&cfg.checkout.root));
let nix = cfg.nix();
let events = stats::RabbitMQ::new(session.open_channel(3).unwrap());
let events = stats::RabbitMQ::new(
&format!("{}-{}", cfg.runner.identity.clone(), cfg.nix.system.clone()),
session.open_channel(3).unwrap()
);
let mrw = tasks::massrebuilder::MassRebuildWorker::new(
cloner,

83
ofborg/src/bin/stats.rs Normal file
View file

@ -0,0 +1,83 @@
extern crate hyper;
extern crate prometheus;
extern crate amqp;
extern crate ofborg;
use std::env;
use ofborg::{easyamqp, tasks, worker, config, stats};
use amqp::Basic;
use ofborg::easyamqp::TypedWrappers;
use hyper::header::ContentType;
use hyper::mime::Mime;
use hyper::server::{Request, Response, Server};
use prometheus::{Counter, Encoder, Gauge, HistogramVec, TextEncoder};
use std::thread;
use std::time::Duration;
fn main() {
let cfg = config::load(env::args().nth(1).unwrap().as_ref());
ofborg::setup_log();
println!("Hello, world!");
let mut session = easyamqp::session_from_config(&cfg.rabbitmq).unwrap();
println!("Connected to rabbitmq");
let events = stats::RabbitMQ::new(
&format!("{}-{}", cfg.runner.identity.clone(), cfg.nix.system.clone()),
session.open_channel(3).unwrap()
);
let collector = tasks::statscollector::StatCollectorWorker::new(
events
);
let mut channel = session.open_channel(1).unwrap();
channel.basic_prefetch(1).unwrap();
channel
.consume(
worker::new(collector),
easyamqp::ConsumeConfig {
queue: "sample-stats-events".to_owned(),
consumer_tag: format!("{}-prometheus-stats-collector", cfg.whoami()),
no_local: false,
no_ack: false,
no_wait: false,
exclusive: false,
arguments: None,
},
)
.unwrap();
thread::spawn(||{
let encoder = TextEncoder::new();
let addr = "127.0.0.1:9898";
println!("listening addr {:?}", addr);
Server::http(addr)
.unwrap()
.handle(move |_: Request, mut res: Response| {
let metric_familys = prometheus::gather();
let mut buffer = vec![];
encoder.encode(&metric_familys, &mut buffer).unwrap();
res.headers_mut()
.set(ContentType(encoder.format_type().parse::<Mime>().unwrap()));
res.send(&buffer).unwrap();
})
.unwrap();
});
channel.start_consuming();
println!("Finished consuming?");
channel.close(200, "Bye").unwrap();
println!("Closed the channel");
session.close(200, "Good Bye");
println!("Closed the session... EOF");
}

View file

@ -1,3 +1,7 @@
#[macro_use]
extern crate prometheus;
#[macro_use]
extern crate serde_derive;
extern crate serde;

View file

@ -1,23 +1,43 @@
use serde_json;
use amqp::Channel;
use amqp::protocol::basic::BasicProperties;
use amqp::Basic;
pub trait SysEvents {
fn tick(&mut self, name: &str);
pub trait SysEvents: Send {
fn notify(&mut self, event: Event);
}
#[derive(Serialize, Deserialize, Debug)]
#[serde(rename_all="kebab-case")]
pub enum Event {
StatCollectorLegacyEvent,
StatCollectorBogusEvent,
JobReceived,
JobDecodeSuccess,
JobDecodeFailure,
IssueAlreadyClosed,
IssueFetchFailed,
}
#[derive(Serialize, Deserialize, Debug)]
pub struct EventMessage {
pub sender: String,
pub events: Vec<Event>,
}
pub struct RabbitMQ {
identity: String,
channel: Channel,
}
impl RabbitMQ {
pub fn new(channel: Channel) -> RabbitMQ {
RabbitMQ { channel: channel }
pub fn new(identity: &str, channel: Channel) -> RabbitMQ {
RabbitMQ { identity: identity.to_owned(), channel: channel }
}
}
impl SysEvents for RabbitMQ {
fn tick(&mut self, name: &str) {
fn notify(&mut self, event: Event) {
let props = BasicProperties { ..Default::default() };
self.channel
.basic_publish(
@ -26,7 +46,10 @@ impl SysEvents for RabbitMQ {
false,
false,
props,
String::from(name).into_bytes(),
serde_json::to_string(&EventMessage {
sender: self.identity.clone(),
events: vec![event],
}).unwrap().into_bytes(),
)
.unwrap();
}

View file

@ -14,6 +14,7 @@ use ofborg::nix::Nix;
use ofborg::acl::ACL;
use ofborg::stats;
use ofborg::stats::Event;
use ofborg::worker;
use ofborg::tagger::{StdenvTagger, RebuildTagger, PathsTagger, PkgsAddedRemovedTagger};
use ofborg::outpathdiff::{OutPaths, OutPathDiff};
@ -73,7 +74,7 @@ impl<E: stats::SysEvents> MassRebuildWorker<E> {
}
}
impl<E: stats::SysEvents> worker::SimpleWorker for MassRebuildWorker<E> {
impl<E: stats::SysEvents + 'static> worker::SimpleWorker for MassRebuildWorker<E> {
type J = massrebuildjob::MassRebuildJob;
fn msg_to_job(
@ -82,14 +83,14 @@ impl<E: stats::SysEvents> worker::SimpleWorker for MassRebuildWorker<E> {
_: &BasicProperties,
body: &Vec<u8>,
) -> Result<Self::J, String> {
self.events.tick("job-received");
self.events.notify(Event::JobReceived);
return match massrebuildjob::from(body) {
Ok(e) => {
self.events.tick("job-decode-success");
self.events.notify(Event::JobDecodeSuccess);
Ok(e)
}
Err(e) => {
self.events.tick("job-decode-failure");
self.events.notify(Event::JobDecodeFailure);
error!(
"Failed to decode message: {:?}, Err: {:?}",
String::from_utf8(body.clone()),
@ -113,7 +114,7 @@ impl<E: stats::SysEvents> worker::SimpleWorker for MassRebuildWorker<E> {
match issue.get() {
Ok(iss) => {
if iss.state == "closed" {
self.events.tick("issue-already-closed");
self.events.notify(Event::IssueAlreadyClosed);
info!("Skipping {} because it is closed", job.pr.number);
return self.actions().skip(&job);
}
@ -128,7 +129,7 @@ impl<E: stats::SysEvents> worker::SimpleWorker for MassRebuildWorker<E> {
}
}
Err(e) => {
self.events.tick("issue-fetch-failed");
self.events.notify(Event::IssueFetchFailed);
info!("Error fetching {}!", job.pr.number);
info!("E: {:?}", e);
return self.actions().skip(&job);

View file

@ -3,4 +3,5 @@ pub mod build;
pub mod massrebuilder;
pub mod githubcommentfilter;
pub mod githubcommentposter;
pub mod statscollector;
pub mod log_message_collector;

View file

@ -0,0 +1,241 @@
extern crate prometheus;
extern crate amqp;
extern crate env_logger;
use serde_json;
use std::str::FromStr;
use ofborg::worker;
use ofborg::stats;
use amqp::protocol::basic::{Deliver, BasicProperties};
use std::collections::HashMap;
use std::mem;
use std::thread;
use std::time::Duration;
use std::sync::Arc;
use std::sync::Mutex;
pub struct StatCollectorWorker<E> {
events: E,
counter_collectors: HashMap<String, prometheus::CounterVec>,
}
impl<E: stats::SysEvents + 'static> StatCollectorWorker<E> {
pub fn new(events: E) -> StatCollectorWorker<E> {
let mut worker = StatCollectorWorker {
events: events,
counter_collectors: HashMap::new(),
};
let initial_events: Vec<stats::Event> = vec![
stats::Event::StatCollectorLegacyEvent,
stats::Event::StatCollectorBogusEvent,
stats::Event::JobReceived,
stats::Event::JobDecodeSuccess,
stats::Event::JobDecodeFailure,
stats::Event::IssueAlreadyClosed,
stats::Event::IssueFetchFailed,
];
for initial_event in initial_events {
match initial_event {
// WARNING
// BEFORE YOU ADD A NEW VARIANT HERE, ADD IT
// TO THE LIST ABOVE!
//
// EACH VARIANT MUST BE INITIALIZED PRIOR
// TO REPORTING STATS
stats::Event::StatCollectorLegacyEvent => {
worker.register_counter(
&initial_event,
prometheus::Opts {
namespace: "ofborg".to_owned(),
subsystem: "stats_collector".to_owned(),
name: "legacy_event".to_owned(),
help: "Number of received legacy events".to_owned(),
const_labels: HashMap::new(),
variable_labels: vec!["instance".to_owned()],
}
);
},
stats::Event::StatCollectorBogusEvent => {
worker.register_counter(
&initial_event,
prometheus::Opts {
namespace: "ofborg".to_owned(),
subsystem: "stats_collector".to_owned(),
name: "bogus_event".to_owned(),
help: "Number of received unparseable events".to_owned(),
const_labels: HashMap::new(),
variable_labels: vec!["instance".to_owned()],
}
);
},
stats::Event::JobReceived => {
worker.register_counter(
&initial_event,
prometheus::Opts {
namespace: "ofborg".to_owned(),
subsystem: "generic_worker".to_owned(),
name: "job_received".to_owned(),
help: "Number of received worker jobs".to_owned(),
const_labels: HashMap::new(),
variable_labels: vec!["instance".to_owned()],
}
);
},
stats::Event::JobDecodeSuccess => {
worker.register_counter(
&initial_event,
prometheus::Opts {
namespace: "ofborg".to_owned(),
subsystem: "generic_worker".to_owned(),
name: "job_decode_successful".to_owned(),
help: "Number of successfully decoded jobs".to_owned(),
const_labels: HashMap::new(),
variable_labels: vec!["instance".to_owned()],
}
);
},
stats::Event::JobDecodeFailure => {
worker.register_counter(
&initial_event,
prometheus::Opts {
namespace: "ofborg".to_owned(),
subsystem: "generic_worker".to_owned(),
name: "job_decode_failure".to_owned(),
help: "Number of jobs which failed to parse".to_owned(),
const_labels: HashMap::new(),
variable_labels: vec!["instance".to_owned()],
}
);
},
stats::Event::IssueAlreadyClosed => {
worker.register_counter(
&initial_event,
prometheus::Opts {
namespace: "ofborg".to_owned(),
subsystem: "github".to_owned(),
name: "issue_closed".to_owned(),
help: "Number of jobs for issues which are already closed".to_owned(),
const_labels: HashMap::new(),
variable_labels: vec!["instance".to_owned()],
}
);
},
stats::Event::IssueFetchFailed => {
worker.register_counter(
&initial_event,
prometheus::Opts {
namespace: "ofborg".to_owned(),
subsystem: "github".to_owned(),
name: "issue_fetch_fail".to_owned(),
help: "Number of failed fetches for GitHub issues".to_owned(),
const_labels: HashMap::new(),
variable_labels: vec!["instance".to_owned()],
}
);
},
};
}
return worker;
}
pub fn counter(&self, event: &stats::Event) -> prometheus::CounterVec {
let disc = format!("{:?}", mem::discriminant(event));
self.counter_collectors.get(&disc).unwrap().clone()
}
pub fn register_counter(
&mut self,
event: &stats::Event,
opts: prometheus::Opts,
) {
let disc = format!("{:?}", mem::discriminant(event));
let orig_labels = opts.variable_labels.clone();
let labels: Vec<&str> = orig_labels
.iter()
.map(|v| v.as_ref())
.collect();
let counter = register_counter_vec!(
opts, labels.as_ref()
).unwrap();
counter.with_label_values(&[""]).inc_by(0.0);
self.counter_collectors.insert(
disc,
counter
);
}
}
impl<E: stats::SysEvents + 'static> worker::SimpleWorker for StatCollectorWorker<E> {
type J = stats::EventMessage;
fn msg_to_job(
&mut self,
_: &Deliver,
_: &BasicProperties,
body: &Vec<u8>,
) -> Result<Self::J, String> {
return match serde_json::from_slice(body) {
Ok(e) => Ok(e),
Err(_) => {
let mut modified_body: Vec<u8> = vec!["\"".as_bytes()[0]];
modified_body.append(&mut body.clone());
modified_body.push("\"".as_bytes()[0]);
match serde_json::from_slice(&modified_body) {
Ok(e) => {
self.events.notify(stats::Event::StatCollectorLegacyEvent);
Ok(stats::EventMessage {
sender: "".to_owned(),
events: vec![e],
})
},
Err(e) => {
self.events.notify(stats::Event::StatCollectorBogusEvent);
error!(
"Failed to decode message: {:?}, Err: {:?}",
String::from_utf8(body.clone()),
e
);
Err("Failed to decode message".to_owned())
}
}
}
};
}
fn consumer(&mut self, job: &stats::EventMessage) -> worker::Actions {
let sender = job.sender.clone();
for event in job.events.iter() {
match *event {
stats::Event::StatCollectorLegacyEvent => {
self.counter(&event).with_label_values(&[sender.as_ref()]).inc();
},
stats::Event::StatCollectorBogusEvent => {
self.counter(&event).with_label_values(&[sender.as_ref()]).inc();
},
stats::Event::JobReceived => {
self.counter(&event).with_label_values(&[sender.as_ref()]).inc();
},
stats::Event::JobDecodeSuccess => {
self.counter(&event).with_label_values(&[sender.as_ref()]).inc();
},
stats::Event::JobDecodeFailure => {
self.counter(&event).with_label_values(&[sender.as_ref()]).inc();
},
stats::Event::IssueAlreadyClosed => {
self.counter(&event).with_label_values(&[sender.as_ref()]).inc();
},
stats::Event::IssueFetchFailed => {
self.counter(&event).with_label_values(&[sender.as_ref()]).inc();
},
}
}
return vec![worker::Action::Ack];
}
}

View file

@ -54,8 +54,8 @@ where
});
}
pub trait SimpleWorker {
type J;
pub trait SimpleWorker: Send + 'static {
type J: Send;
fn consumer(&mut self, job: &Self::J) -> Actions;