Merge pull request #72 from NixOS/stats

Stats
This commit is contained in:
Graham Christensen 2018-02-21 19:20:02 -05:00 committed by GitHub
commit d38c703844
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
10 changed files with 825 additions and 20 deletions

View file

@ -410,7 +410,8 @@ rec {
crateName = "ofborg"; crateName = "ofborg";
version = "0.1.1"; version = "0.1.1";
authors = [ "Graham Christensen <graham@grahamc.com>" ]; authors = [ "Graham Christensen <graham@grahamc.com>" ];
src = include [ "Cargo.toml" "Cargo.lock" "src" "test-srcs" ] ./../ofborg; src = include [ "Cargo.toml" "Cargo.lock" "src" "test-srcs" "build.rs" ] ./../ofborg;
build = "build.rs";
inherit dependencies buildDependencies features; inherit dependencies buildDependencies features;
}; };
openssl_0_9_23_ = { dependencies?[], buildDependencies?[], features?[] }: buildRustCrate { openssl_0_9_23_ = { dependencies?[], buildDependencies?[], features?[] }: buildRustCrate {

View file

@ -2,8 +2,8 @@
name = "ofborg" name = "ofborg"
version = "0.1.1" version = "0.1.1"
authors = ["Graham Christensen <graham@grahamc.com>"] authors = ["Graham Christensen <graham@grahamc.com>"]
include = ["Cargo.toml", "Cargo.lock", "src", "test-srcs"] include = ["Cargo.toml", "Cargo.lock", "src", "test-srcs", "build.rs"]
build = "build.rs"
[dependencies] [dependencies]
@ -24,6 +24,5 @@ hyper = "0.10.*"
hyper-native-tls = "0.2.4" hyper-native-tls = "0.2.4"
lru-cache = "0.1.1" lru-cache = "0.1.1"
#[patch.crates-io] #[patch.crates-io]
#amq-proto = { path = "rust-amq-proto" } #amq-proto = { path = "rust-amq-proto" }

618
ofborg/build.rs Normal file
View file

@ -0,0 +1,618 @@
use std::env;
use std::fs::File;
use std::io::Write;
use std::path::Path;
enum MetricType {
Ticker(Metric),
Counter(Metric),
}
impl MetricType {
fn collector_type(&self) -> String {
match self {
&MetricType::Ticker(_) => {
String::from("u64")
}
&MetricType::Counter(_) => {
String::from("u64")
}
}
}
fn enum_matcher_types(&self) -> String {
let fields = self.enum_field_types();
if fields.len() > 0 {
format!("{}({})", self.variant(), fields.join(", "))
} else {
format!("{}", self.variant())
}
}
fn variant(&self) -> String {
match self {
&MetricType::Ticker(ref event) => {
event.variant.clone()
}
&MetricType::Counter(ref event) => {
event.variant.clone()
}
}
}
fn metric_type(&self) -> String {
match self {
&MetricType::Ticker(_) => {
String::from("counter")
}
&MetricType::Counter(_) => {
String::from("counter")
}
}
}
fn metric_name(&self) -> String {
match self {
&MetricType::Ticker(ref event) => {
event.metric_name.clone()
}
&MetricType::Counter(ref event) => {
event.metric_name.clone()
}
}
}
fn description(&self) -> String {
match self {
&MetricType::Ticker(ref event) => {
event.description.clone()
}
&MetricType::Counter(ref event) => {
event.description.clone()
}
}
}
fn enum_index_types(&self) -> Vec<String> {
let event: &Metric;
match self {
&MetricType::Ticker(ref i_event) => {
event = i_event;
}
&MetricType::Counter(ref i_event) => {
event = i_event;
}
}
let fields: Vec<String> = event.fields
.iter()
.map(|&(ref _fieldname, ref fieldtype)| fieldtype.clone())
.collect();
return fields
}
fn enum_field_types(&self) -> Vec<String> {
let mut extra_fields: Vec<String> = vec![];
match self {
&MetricType::Ticker(_) => {}
&MetricType::Counter(_) => {
extra_fields = vec![self.collector_type()];
}
}
let mut fields: Vec<String> = self.enum_index_types();
fields.append(&mut extra_fields);
return fields
}
fn enum_index_names(&self) -> Vec<String> {
let event: &Metric;
match self {
&MetricType::Ticker(ref i_event) => {
event = i_event;
}
&MetricType::Counter(ref i_event) => {
event = i_event;
}
}
let fields: Vec<String> = event.fields
.iter()
.map(|&(ref fieldname, ref _fieldtype)| fieldname.clone())
.collect();
return fields
}
fn enum_field_names(&self) -> Vec<String> {
let mut extra_fields: Vec<String> = vec![];
match self {
&MetricType::Ticker(_) => {}
&MetricType::Counter(_) => {
extra_fields = vec!["value".to_owned()];
}
}
let mut fields: Vec<String> = self.enum_index_names();
fields.append(&mut extra_fields);
return fields
}
fn record_value(&self) -> String {
match self {
&MetricType::Ticker(_) => {
String::from("1")
}
&MetricType::Counter(_) => {
String::from("value")
}
}
}
}
struct Metric {
variant: String,
fields: Vec<(String,String)>, // Vec because it is sorted
metric_name: String,
description: String,
}
fn name_to_parts(name: &str) -> Vec<String> {
let mut parts: Vec<String> = vec![];
let mut buf = String::from("");
for c in name.chars() {
if char::is_uppercase(c) && buf.len() > 0 {
parts.push(buf.to_owned());
buf = String::from("");
}
buf.push_str(&c.to_string());
}
if buf.len() > 0 {
parts.push(buf.to_owned());
std::mem::drop(buf);
}
return parts;
}
impl Metric {
pub fn ticker(name: &str, desc: &str, fields: Option<Vec<(&str,&str)>>) -> MetricType {
let parts = name_to_parts(name);
MetricType::Ticker(Metric {
variant: parts
.iter()
.map(|f| f.clone().to_owned())
.collect(),
fields: fields
.unwrap_or(vec![])
.iter()
.map(|&(ref fieldname, ref fieldtype)| (fieldname.clone().to_owned(), fieldtype.clone().to_owned()))
.collect(),
metric_name: parts.join("_").to_lowercase(),
description: desc.to_owned(),
})
}
pub fn counter(name: &str, desc: &str, fields: Option<Vec<(&str,&str)>>) -> MetricType {
let parts = name_to_parts(name);
MetricType::Counter(Metric {
variant: parts
.iter()
.map(|f| f.clone().to_owned())
.collect(),
fields: fields
.unwrap_or(vec![])
.iter()
.map(|&(ref fieldname, ref fieldtype)| (fieldname.clone().to_owned(), fieldtype.clone().to_owned()))
.collect(),
metric_name: parts.join("_").to_lowercase(),
description: desc.to_owned(),
})
}
}
fn events() -> Vec<MetricType> {
return vec![
Metric::ticker(
"StatCollectorLegacyEvent",
"Number of received legacy events",
Some(vec![("event", "String")]),
),
Metric::ticker(
"StatCollectorBogusEvent",
"Number of received unparseable events",
None,
),
Metric::ticker(
"JobReceived",
"Number of received worker jobs",
None,
),
Metric::counter(
"EvaluationDuration",
"Amount of time spent running evaluations",
Some(vec![
("branch", "String"),
]),
),
Metric::ticker(
"EvaluationDurationCount",
"Number of timed evaluations performed",
Some(vec![
("branch", "String"),
]),
),
Metric::ticker(
"TargetBranchFailsEvaluation",
"Number of PR evaluations which failed because the target branch failed",
Some(vec![
("branch", "String"),
]),
),
Metric::ticker(
"JobDecodeSuccess",
"Number of successfully decoded jobs",
None,
),
Metric::ticker(
"JobDecodeFailure",
"Number of jobs which failed to parse",
None,
),
Metric::ticker(
"IssueAlreadyClosed",
"Number of jobs for issues which are already closed",
None,
),
Metric::ticker(
"IssueFetchFailed",
"Number of failed fetches for GitHub issues",
None,
),
Metric::ticker(
"TaskEvaluationCheckComplete",
"Number of completed evaluation tasks",
None,
),
/*
Metric::counter(
"TimeElapsed",
"",
None
),
Metric::counter(
"EnvironmentsAllocatedCount",
"",
None
),
Metric::counter(
"EnvironmentsAllocatedBytes",
"",
None
),
Metric::counter(
"ListElementsCount",
"",
None
),
Metric::counter(
"ListElementsBytes",
"",
None
),
Metric::counter(
"ListConcatenations",
"",
None
),
Metric::counter(
"ValuesAllocatedCount",
"",
None
),
Metric::counter(
"ValuesAllocatedBytes",
"",
None
),
Metric::counter(
"SetsAllocatedCount",
"",
None
),
Metric::counter(
"SetsAllocatedBytes",
"",
None
),
Metric::counter(
"RightBiasedUnions",
"",
None
),
Metric::counter(
"ValuesCopiedInRightBiasedUnions",
"",
None
),
Metric::counter(
"SymbolsInSymbolTable",
"",
None
),
Metric::counter(
"SizeOfSymbolTable",
"",
None
),
Metric::counter(
"NumberOfThunks",
"",
None
),
Metric::counter(
"NumberOfThunksAvoided",
"",
None
),
Metric::counter(
"NumberOfAttrLookups",
"",
None
),
Metric::counter(
"NumberOfPrimopCalls",
"",
None
),
Metric::counter(
"NumberOfFunctionCalls",
"",
None
),
Metric::counter(
"TotalAllocations",
"",
None
),
Metric::counter(
"CurrentBoehmHeapSizeBytes",
"",
None
),
Metric::counter(
"TotalBoehmHeapAllocationsBytes",
"",
None
),
*/
];
}
fn main() {
let out_dir = env::var("OUT_DIR").unwrap();
let dest_path = Path::new(&out_dir).join("events.rs");
let mut f = File::create(&dest_path).unwrap();
println!("cargo:rerun-if-changed=build.rs");
// Write the Event enum, which contains all possible event types
f.write_all(b"
use std::collections::HashMap;
use std::sync::Arc;
use std::sync::Mutex;
#[derive(Serialize, Deserialize, Debug, Clone)]
#[serde(rename_all=\"kebab-case\")]
pub enum Event {
").unwrap();
let variants: Vec<String> = events()
.iter()
.map(|mtype| format!(" {}", mtype.enum_matcher_types()) )
.collect();
f.write_all(variants.join(",\n").as_bytes()).unwrap();
f.write_all("\n}\n\n".as_bytes()).unwrap();
f.write_all(b"pub fn event_metric_name(event: &Event) -> String {
match event {
").unwrap();
let variants: Vec<String> = events()
.iter()
.map(|mtype| {
let fields: Vec<String> = mtype.enum_field_names()
.iter()
.map(|_| String::from("_"))
.collect();
let variant_match: String;
if fields.len() > 0 {
variant_match = format!(
"{}({})",
&mtype.variant(),
fields
.join(", "));
} else {
variant_match = format!("{}", &mtype.variant());
}
format!(" &Event::{} => String::from(\"{}\")",
&variant_match,
&mtype.metric_name(),
)
}).collect();
f.write_all(variants.join(",\n").as_bytes()).unwrap();
f.write_all("}\n }".as_bytes()).unwrap();
// Create a struct to hold all the possible metrics
f.write_all(b"
#[derive(Debug, Clone)]
pub struct MetricCollector {
").unwrap();
let variants: Vec<String> = events()
.iter()
.map(|mtype| {
let mut fields: Vec<String> = mtype.enum_index_types();
fields.push("String".to_owned()); // Instance
format!(" {}: Arc<Mutex<HashMap<({}),{}>>>",
mtype.metric_name(),
fields.join(", "),
mtype.collector_type(),
)
}).collect();
f.write_all(variants.join(",\n").as_bytes()).unwrap();
f.write_all("\n}\n\n".as_bytes()).unwrap();
// Create a struct to hold all the possible metrics
f.write_all(b"
impl MetricCollector {
pub fn new() -> MetricCollector {
MetricCollector {
").unwrap();
let variants: Vec<String> = events()
.iter()
.map(|mtype| {
let mut fields: Vec<String> = mtype.enum_field_types();
fields.push("String".to_owned()); // Instance
format!(" {}: Arc::new(Mutex::new(HashMap::new()))",
&mtype.metric_name(),
)
}).collect();
f.write_all(variants.join(",\n").as_bytes()).unwrap();
f.write_all("\n }\n".as_bytes()).unwrap();
f.write_all("\n }\n".as_bytes()).unwrap();
f.write_all(b"
pub fn record(&self, instance: String, event: Event) {
match event {
").unwrap();
let variants: Vec<String> = events()
.iter()
.map(|mtype| {
let fields: Vec<String> = mtype.enum_field_names();
let variant_match: String;
if fields.len() > 0 {
variant_match = format!("{}({})", &mtype.variant(), fields.join(", "));
} else {
variant_match = format!("{}", &mtype.variant());
}
let mut index_fields: Vec<String> = mtype.enum_index_names();
index_fields.push("instance".to_owned());
format!("
Event::{} => {{
let mut accum_table = self.{}
.lock()
.expect(\"Failed to unwrap metric mutex for {}\");
let accum = accum_table
.entry(({}))
.or_insert(0);
*accum += {};
}}
",
variant_match,
&mtype.metric_name(),
&mtype.metric_name(),
index_fields.join(", "),
&mtype.record_value(),
)
}).collect();
f.write_all(variants.join(",\n").as_bytes()).unwrap();
f.write_all("\n }\n".as_bytes()).unwrap();
f.write_all("\n }\n".as_bytes()).unwrap();
f.write_all(b"pub fn prometheus_output(&self) -> String {
let mut output = String::new();
").unwrap();
let variants: Vec<String> = events()
.iter()
.map(|mtype| {
let mut index_fields: Vec<String> = mtype.enum_index_names();
index_fields.push("instance".to_owned());
let ref_index_fields: Vec<String> = index_fields
.iter()
.map(|m| format!("ref {}", m))
.collect();
let for_matcher: String;
if index_fields.len() > 1 {
for_matcher = format!("({})",
ref_index_fields.join(", "));
} else {
for_matcher = ref_index_fields.join(", ");
}
let key_value_pairs: Vec<String> = index_fields
.iter()
.map(|name| format!(" format!(\"{}=\\\"{{}}\\\"\", {})", &name, &name))
.collect();
format!("
output.push_str(\"# HELP ofborg_{} {}\n\");
output.push_str(\"# TYPE ofborg_{} {}\n\");
let table = self.{}.lock()
.expect(\"Failed to unwrap metric mutex for {}\");
let values: Vec<String> = (*table)
.iter()
.map(|(&{}, value)| {{
let kvs: Vec<String> = vec![
{}
];
format!(\"ofborg_{}{{{{{{}}}}}} {{}}\", kvs.join(\",\"), value)
}})
.collect();
output.push_str(&values.join(\"\n\"));
output.push_str(\"\n\");
",
&mtype.metric_name(),
&mtype.description(),
&mtype.metric_name(),
&mtype.metric_type(),
&mtype.metric_name(),
&mtype.metric_name(),
for_matcher,
&key_value_pairs.join(",\n"),
&mtype.metric_name(),
)
}).collect();
f.write_all(variants.join("\n").as_bytes()).unwrap();
f.write_all("return output;\n }".as_bytes()).unwrap();
f.write_all("\n}".as_bytes()).unwrap();
}

View file

@ -30,7 +30,10 @@ fn main() {
let cloner = checkout::cached_cloner(Path::new(&cfg.checkout.root)); let cloner = checkout::cached_cloner(Path::new(&cfg.checkout.root));
let nix = cfg.nix(); let nix = cfg.nix();
let events = stats::RabbitMQ::new(session.open_channel(3).unwrap()); let events = stats::RabbitMQ::new(
&format!("{}-{}", cfg.runner.identity.clone(), cfg.nix.system.clone()),
session.open_channel(3).unwrap()
);
let mrw = tasks::massrebuilder::MassRebuildWorker::new( let mrw = tasks::massrebuilder::MassRebuildWorker::new(
cloner, cloner,

76
ofborg/src/bin/stats.rs Normal file
View file

@ -0,0 +1,76 @@
extern crate hyper;
extern crate amqp;
extern crate ofborg;
use std::env;
use ofborg::{easyamqp, tasks, worker, config, stats};
use amqp::Basic;
use ofborg::easyamqp::TypedWrappers;
use hyper::server::{Request, Response, Server};
use std::thread;
use std::time::Duration;
fn main() {
let cfg = config::load(env::args().nth(1).unwrap().as_ref());
ofborg::setup_log();
println!("Hello, world!");
let mut session = easyamqp::session_from_config(&cfg.rabbitmq).unwrap();
println!("Connected to rabbitmq");
let events = stats::RabbitMQ::new(
&format!("{}-{}", cfg.runner.identity.clone(), cfg.nix.system.clone()),
session.open_channel(3).unwrap()
);
let metrics = stats::MetricCollector::new();
let collector = tasks::statscollector::StatCollectorWorker::new(
events,
metrics.clone(),
);
let mut channel = session.open_channel(1).unwrap();
channel.basic_prefetch(1).unwrap();
channel
.consume(
worker::new(collector),
easyamqp::ConsumeConfig {
queue: "sample-stats-events".to_owned(),
consumer_tag: format!("{}-prometheus-stats-collector", cfg.whoami()),
no_local: false,
no_ack: false,
no_wait: false,
exclusive: false,
arguments: None,
},
)
.unwrap();
thread::spawn(||{
let addr = "127.0.0.1:9898";
println!("listening addr {:?}", addr);
Server::http(addr)
.unwrap()
.handle(move |_: Request, res: Response| {
res.send(metrics.prometheus_output().as_bytes()).unwrap();
})
.unwrap();
});
channel.start_consuming();
println!("Finished consuming?");
channel.close(200, "Bye").unwrap();
println!("Closed the channel");
session.close(200, "Good Bye");
println!("Closed the session... EOF");
}

View file

@ -1,23 +1,40 @@
use serde_json;
use amqp::Channel; use amqp::Channel;
use amqp::protocol::basic::BasicProperties; use amqp::protocol::basic::BasicProperties;
use amqp::Basic; use amqp::Basic;
pub trait SysEvents { include!(concat!(env!("OUT_DIR"), "/events.rs"));
fn tick(&mut self, name: &str);
#[macro_use]
mod macros {
#[macro_export]
macro_rules! my_macro(() => (FooBar));
}
pub trait SysEvents: Send {
fn notify(&mut self, event: Event);
}
#[derive(Serialize, Deserialize, Debug)]
pub struct EventMessage {
pub sender: String,
pub events: Vec<Event>,
} }
pub struct RabbitMQ { pub struct RabbitMQ {
identity: String,
channel: Channel, channel: Channel,
} }
impl RabbitMQ { impl RabbitMQ {
pub fn new(channel: Channel) -> RabbitMQ { pub fn new(identity: &str, channel: Channel) -> RabbitMQ {
RabbitMQ { channel: channel } RabbitMQ { identity: identity.to_owned(), channel: channel }
} }
} }
impl SysEvents for RabbitMQ { impl SysEvents for RabbitMQ {
fn tick(&mut self, name: &str) { fn notify(&mut self, event: Event) {
let props = BasicProperties { ..Default::default() }; let props = BasicProperties { ..Default::default() };
self.channel self.channel
.basic_publish( .basic_publish(
@ -26,7 +43,10 @@ impl SysEvents for RabbitMQ {
false, false,
false, false,
props, props,
String::from(name).into_bytes(), serde_json::to_string(&EventMessage {
sender: self.identity.clone(),
events: vec![event],
}).unwrap().into_bytes(),
) )
.unwrap(); .unwrap();
} }

View file

@ -11,9 +11,10 @@ use std::path::PathBuf;
use ofborg::checkout; use ofborg::checkout;
use ofborg::message::{massrebuildjob, buildjob}; use ofborg::message::{massrebuildjob, buildjob};
use ofborg::nix::Nix; use ofborg::nix::Nix;
use std::time::Instant;
use ofborg::acl::ACL; use ofborg::acl::ACL;
use ofborg::stats; use ofborg::stats;
use ofborg::stats::Event;
use ofborg::worker; use ofborg::worker;
use ofborg::tagger::{StdenvTagger, RebuildTagger, PathsTagger, PkgsAddedRemovedTagger}; use ofborg::tagger::{StdenvTagger, RebuildTagger, PathsTagger, PkgsAddedRemovedTagger};
use ofborg::outpathdiff::{OutPaths, OutPathDiff}; use ofborg::outpathdiff::{OutPaths, OutPathDiff};
@ -87,7 +88,7 @@ impl<E: stats::SysEvents> MassRebuildWorker<E> {
} }
} }
impl<E: stats::SysEvents> worker::SimpleWorker for MassRebuildWorker<E> { impl<E: stats::SysEvents + 'static> worker::SimpleWorker for MassRebuildWorker<E> {
type J = massrebuildjob::MassRebuildJob; type J = massrebuildjob::MassRebuildJob;
fn msg_to_job( fn msg_to_job(
@ -96,14 +97,14 @@ impl<E: stats::SysEvents> worker::SimpleWorker for MassRebuildWorker<E> {
_: &BasicProperties, _: &BasicProperties,
body: &Vec<u8>, body: &Vec<u8>,
) -> Result<Self::J, String> { ) -> Result<Self::J, String> {
self.events.tick("job-received"); self.events.notify(Event::JobReceived);
return match massrebuildjob::from(body) { return match massrebuildjob::from(body) {
Ok(e) => { Ok(e) => {
self.events.tick("job-decode-success"); self.events.notify(Event::JobDecodeSuccess);
Ok(e) Ok(e)
} }
Err(e) => { Err(e) => {
self.events.tick("job-decode-failure"); self.events.notify(Event::JobDecodeFailure);
error!( error!(
"Failed to decode message: {:?}, Err: {:?}", "Failed to decode message: {:?}, Err: {:?}",
String::from_utf8(body.clone()), String::from_utf8(body.clone()),
@ -127,7 +128,7 @@ impl<E: stats::SysEvents> worker::SimpleWorker for MassRebuildWorker<E> {
match issue.get() { match issue.get() {
Ok(iss) => { Ok(iss) => {
if iss.state == "closed" { if iss.state == "closed" {
self.events.tick("issue-already-closed"); self.events.notify(Event::IssueAlreadyClosed);
info!("Skipping {} because it is closed", job.pr.number); info!("Skipping {} because it is closed", job.pr.number);
return self.actions().skip(&job); return self.actions().skip(&job);
} }
@ -142,7 +143,7 @@ impl<E: stats::SysEvents> worker::SimpleWorker for MassRebuildWorker<E> {
} }
} }
Err(e) => { Err(e) => {
self.events.tick("issue-fetch-failed"); self.events.notify(Event::IssueFetchFailed);
info!("Error fetching {}!", job.pr.number); info!("Error fetching {}!", job.pr.number);
info!("E: {:?}", e); info!("E: {:?}", e);
return self.actions().skip(&job); return self.actions().skip(&job);
@ -201,6 +202,8 @@ impl<E: stats::SysEvents> worker::SimpleWorker for MassRebuildWorker<E> {
hubcaps::statuses::State::Pending, hubcaps::statuses::State::Pending,
); );
let target_branch_rebuild_sniff_start = Instant::now();
if let Err(mut output) = rebuildsniff.find_before() { if let Err(mut output) = rebuildsniff.find_before() {
overall_status.set_url(make_gist( overall_status.set_url(make_gist(
&gists, &gists,
@ -209,6 +212,7 @@ impl<E: stats::SysEvents> worker::SimpleWorker for MassRebuildWorker<E> {
file_to_str(&mut output), file_to_str(&mut output),
)); ));
self.events.notify(Event::TargetBranchFailsEvaluation(target_branch.clone()));
overall_status.set_with_description( overall_status.set_with_description(
format!("Target branch {} doesn't evaluate!", &target_branch).as_ref(), format!("Target branch {} doesn't evaluate!", &target_branch).as_ref(),
hubcaps::statuses::State::Failure, hubcaps::statuses::State::Failure,
@ -216,6 +220,17 @@ impl<E: stats::SysEvents> worker::SimpleWorker for MassRebuildWorker<E> {
return self.actions().skip(&job); return self.actions().skip(&job);
} }
self.events.notify(
Event::EvaluationDuration(
target_branch.clone(),
target_branch_rebuild_sniff_start.elapsed().as_secs(),
)
);
self.events.notify(
Event::EvaluationDurationCount(
target_branch.clone()
)
);
overall_status.set_with_description("Fetching PR", hubcaps::statuses::State::Pending); overall_status.set_with_description("Fetching PR", hubcaps::statuses::State::Pending);
@ -525,6 +540,8 @@ impl<E: stats::SysEvents> worker::SimpleWorker for MassRebuildWorker<E> {
); );
} }
self.events.notify(Event::TaskEvaluationCheckComplete);
return self.actions().done(&job, response); return self.actions().done(&job, response);
} }
} }

View file

@ -3,4 +3,5 @@ pub mod build;
pub mod massrebuilder; pub mod massrebuilder;
pub mod githubcommentfilter; pub mod githubcommentfilter;
pub mod githubcommentposter; pub mod githubcommentposter;
pub mod statscollector;
pub mod log_message_collector; pub mod log_message_collector;

View file

@ -0,0 +1,70 @@
extern crate amqp;
extern crate env_logger;
use serde_json;
use ofborg::worker;
use ofborg::stats;
use amqp::protocol::basic::{Deliver, BasicProperties};
pub struct StatCollectorWorker<E> {
events: E,
collector: stats::MetricCollector,
}
impl<E: stats::SysEvents + 'static> StatCollectorWorker<E> {
pub fn new(events: E, collector: stats::MetricCollector) -> StatCollectorWorker<E> {
StatCollectorWorker {
events: events,
collector: collector,
}
}
}
impl<E: stats::SysEvents + 'static> worker::SimpleWorker for StatCollectorWorker<E> {
type J = stats::EventMessage;
fn msg_to_job(
&mut self,
_: &Deliver,
_: &BasicProperties,
body: &Vec<u8>,
) -> Result<Self::J, String> {
return match serde_json::from_slice(body) {
Ok(e) => Ok(e),
Err(_) => {
let mut modified_body: Vec<u8> = vec!["\"".as_bytes()[0]];
modified_body.append(&mut body.clone());
modified_body.push("\"".as_bytes()[0]);
match serde_json::from_slice(&modified_body) {
Ok(e) => {
self.events.notify(stats::Event::StatCollectorLegacyEvent(stats::event_metric_name(&e)));
Ok(stats::EventMessage {
sender: "".to_owned(),
events: vec![e],
})
},
Err(e) => {
self.events.notify(stats::Event::StatCollectorBogusEvent);
error!(
"Failed to decode message: {:?}, Err: {:?}",
String::from_utf8(body.clone()),
e
);
Err("Failed to decode message".to_owned())
}
}
}
};
}
fn consumer(&mut self, job: &stats::EventMessage) -> worker::Actions {
let sender = job.sender.clone();
for event in job.events.iter() {
self.collector.record(sender.clone(), event.clone());
}
return vec![worker::Action::Ack];
}
}

View file

@ -54,8 +54,8 @@ where
}); });
} }
pub trait SimpleWorker { pub trait SimpleWorker: Send + 'static {
type J; type J: Send;
fn consumer(&mut self, job: &Self::J) -> Actions; fn consumer(&mut self, job: &Self::J) -> Actions;