Stats support maybe

This commit is contained in:
Graham Christensen 2018-01-13 12:32:04 -05:00
parent 960e9766d4
commit 9fcb621b48
No known key found for this signature in database
GPG key ID: ACA1C1D120C83D5C
8 changed files with 62 additions and 17 deletions

View file

@ -11,6 +11,7 @@ use ofborg::tasks;
use ofborg::config;
use ofborg::checkout;
use ofborg::stats;
use ofborg::worker;
use amqp::Session;
use amqp::Table;
@ -47,13 +48,14 @@ fn main() {
let cloner = checkout::cached_cloner(Path::new(&cfg.checkout.root));
let nix = cfg.nix();
let events = stats::RabbitMQ::new(session.open_channel(3).unwrap());
let mrw = tasks::massrebuilder::MassRebuildWorker::new(
cloner,
nix,
cfg.github(),
cfg.runner.identity.clone(),
events
);
channel.basic_prefetch(1).unwrap();

View file

@ -25,6 +25,7 @@ pub mod message;
pub mod tasks;
pub mod evalchecker;
pub mod nix;
pub mod stats;
pub mod ghevent;
pub mod commentparser;
pub mod commitstatus;
@ -35,6 +36,7 @@ pub mod notifyworker;
pub mod ofborg {
pub use asynccmd;
pub use stats;
pub use config;
pub use checkout;
pub use locks;

33
ofborg/src/stats.rs Normal file
View file

@ -0,0 +1,33 @@
use amqp::Channel;
use amqp::protocol::basic::BasicProperties;
use amqp::Basic;
pub trait SysEvents {
fn tick(&mut self, name: &str);
}
pub struct RabbitMQ {
channel: Channel
}
impl RabbitMQ {
pub fn new(channel: Channel) -> RabbitMQ {
RabbitMQ {
channel: channel
}
}
}
impl SysEvents for RabbitMQ {
fn tick(&mut self, name: &str) {
let props = BasicProperties{ ..Default::default()};
self.channel.basic_publish(
String::from("stats"),
"".to_owned(),
false,
false,
props,
String::from(name).into_bytes()
).unwrap();
}
}

View file

@ -43,7 +43,7 @@ impl BuildWorker {
impl worker::SimpleWorker for BuildWorker {
type J = buildjob::BuildJob;
fn msg_to_job(&self, _: &Deliver, _: &BasicProperties,
fn msg_to_job(&mut self, _: &Deliver, _: &BasicProperties,
body: &Vec<u8>) -> Result<Self::J, String> {
println!("lmao I got a job?");
return match buildjob::from(body) {
@ -55,7 +55,7 @@ impl worker::SimpleWorker for BuildWorker {
}
}
fn consumer(&self, job: &buildjob::BuildJob) -> worker::Actions {
fn consumer(&mut self, job: &buildjob::BuildJob) -> worker::Actions {
info!("Working on {}", job.pr.number);
let project = self.cloner.project(job.repo.full_name.clone(), job.repo.clone_url.clone());
let co = project.clone_for("builder".to_string(),

View file

@ -29,7 +29,7 @@ impl GitHubCommentWorker {
impl worker::SimpleWorker for GitHubCommentWorker {
type J = ghevent::IssueComment;
fn msg_to_job(&self, _: &Deliver, _: &BasicProperties,
fn msg_to_job(&mut self, _: &Deliver, _: &BasicProperties,
body: &Vec<u8>) -> Result<Self::J, String> {
return match serde_json::from_slice(body) {
Ok(e) => { Ok(e) }
@ -40,7 +40,7 @@ impl worker::SimpleWorker for GitHubCommentWorker {
}
}
fn consumer(&self, job: &ghevent::IssueComment) -> worker::Actions {
fn consumer(&mut self, job: &ghevent::IssueComment) -> worker::Actions {
let instructions = commentparser::parse(&job.comment.body);
if instructions == None {
return vec![

View file

@ -31,7 +31,7 @@ impl PlasticHeartbeatWorker {
impl worker::SimpleWorker for PlasticHeartbeatWorker {
type J = plasticheartbeat::PlasticHeartbeat;
fn msg_to_job(&self, _: &Deliver, _: &BasicProperties,
fn msg_to_job(&mut self, _: &Deliver, _: &BasicProperties,
body: &Vec<u8>) -> Result<Self::J, String> {
return match plasticheartbeat::from(body) {
Ok(e) => { Ok(e) }
@ -42,7 +42,7 @@ impl worker::SimpleWorker for PlasticHeartbeatWorker {
}
}
fn consumer(&self, _job: &plasticheartbeat::PlasticHeartbeat) -> worker::Actions {
fn consumer(&mut self, _job: &plasticheartbeat::PlasticHeartbeat) -> worker::Actions {
thread::sleep(time::Duration::from_secs(5));
return vec![

View file

@ -12,6 +12,7 @@ use ofborg::checkout;
use ofborg::message::massrebuildjob;
use ofborg::nix::Nix;
use ofborg::stats;
use ofborg::worker;
use ofborg::tagger::{StdenvTagger,RebuildTagger};
use ofborg::outpathdiff::{OutPaths, OutPathDiff};
@ -20,20 +21,22 @@ use ofborg::commitstatus::CommitStatus;
use amqp::protocol::basic::{Deliver,BasicProperties};
use hubcaps;
pub struct MassRebuildWorker {
pub struct MassRebuildWorker<E> {
cloner: checkout::CachedCloner,
nix: Nix,
github: hubcaps::Github,
identity: String,
events: E,
}
impl MassRebuildWorker {
pub fn new(cloner: checkout::CachedCloner, nix: Nix, github: hubcaps::Github, identity: String) -> MassRebuildWorker {
impl<E: stats::SysEvents> MassRebuildWorker<E> {
pub fn new(cloner: checkout::CachedCloner, nix: Nix, github: hubcaps::Github, identity: String, events: E) -> MassRebuildWorker<E> {
return MassRebuildWorker{
cloner: cloner,
nix: nix,
github: github,
identity: identity
identity: identity,
events: events,
};
}
@ -43,21 +46,26 @@ impl MassRebuildWorker {
}
}
impl worker::SimpleWorker for MassRebuildWorker {
impl<E: stats::SysEvents> worker::SimpleWorker for MassRebuildWorker<E> {
type J = massrebuildjob::MassRebuildJob;
fn msg_to_job(&self, _: &Deliver, _: &BasicProperties,
fn msg_to_job(&mut self, _: &Deliver, _: &BasicProperties,
body: &Vec<u8>) -> Result<Self::J, String> {
self.events.tick("job-received");
return match massrebuildjob::from(body) {
Ok(e) => { Ok(e) }
Ok(e) => {
self.events.tick("job-decode-success");
Ok(e)
}
Err(e) => {
self.events.tick("job-decode-failure");
println!("{:?}", String::from_utf8(body.clone()));
panic!("{:?}", e);
}
}
}
fn consumer(&self, job: &massrebuildjob::MassRebuildJob) -> worker::Actions {
fn consumer(&mut self, job: &massrebuildjob::MassRebuildJob) -> worker::Actions {
let repo = self.github
.repo(job.repo.owner.clone(), job.repo.name.clone());
let gists = self.github.gists();

View file

@ -53,9 +53,9 @@ pub fn publish_serde_action<T: ?Sized>(exchange: Option<String>, routing_key: Op
pub trait SimpleWorker {
type J;
fn consumer(&self, job: &Self::J) -> Actions;
fn consumer(&mut self, job: &Self::J) -> Actions;
fn msg_to_job(&self, method: &Deliver, headers: &BasicProperties,
fn msg_to_job(&mut self, method: &Deliver, headers: &BasicProperties,
body: &Vec<u8>) -> Result<Self::J, String>;
}