Write a evaluation job filter in rust

This commit is contained in:
Graham Christensen 2018-02-23 21:23:50 -05:00
parent de08c2eb65
commit 6e305e16aa
No known key found for this signature in database
GPG key ID: ACA1C1D120C83D5C
3 changed files with 243 additions and 0 deletions

View file

@ -0,0 +1,93 @@
extern crate ofborg;
extern crate amqp;
extern crate env_logger;
extern crate hyper;
extern crate hubcaps;
extern crate hyper_native_tls;
use std::env;
use amqp::Basic;
use ofborg::config;
use ofborg::worker;
use ofborg::tasks;
use ofborg::easyamqp;
use ofborg::easyamqp::TypedWrappers;
fn main() {
let cfg = config::load(env::args().nth(1).unwrap().as_ref());
ofborg::setup_log();
println!("Hello, world!");
let mut session = easyamqp::session_from_config(&cfg.rabbitmq).unwrap();
println!("Connected to rabbitmq");
let mut channel = session.open_channel(1).unwrap();
channel
.declare_queue(easyamqp::QueueConfig {
queue: "mass-rebuild-check-jobs".to_owned(),
passive: false,
durable: true,
exclusive: false,
auto_delete: false,
no_wait: false,
arguments: None,
})
.unwrap();
channel
.declare_queue(easyamqp::QueueConfig {
queue: "mass-rebuild-check-inputs".to_owned(),
passive: false,
durable: true,
exclusive: false,
auto_delete: false,
no_wait: false,
arguments: None,
})
.unwrap();
channel
.bind_queue(easyamqp::BindQueueConfig {
queue: "mass-rebuild-check-inputs".to_owned(),
exchange: "github-events".to_owned(),
routing_key: Some("pull_request.nixos/nixpkgs".to_owned()),
no_wait: false,
arguments: None,
})
.unwrap();
channel.basic_prefetch(1).unwrap();
channel
.consume(
worker::new(tasks::evaluationfilter::EvaluationFilterWorker::new(
cfg.acl(),
)),
easyamqp::ConsumeConfig {
queue: "mass-rebuild-check-inputs".to_owned(),
consumer_tag: format!("{}-evaluation-filter", cfg.whoami()),
no_local: false,
no_ack: false,
no_wait: false,
exclusive: false,
arguments: None,
},
)
.unwrap();
channel.start_consuming();
println!("Finished consuming?");
channel.close(200, "Bye").unwrap();
println!("Closed the channel");
session.close(200, "Good Bye");
println!("Closed the session... EOF");
}

View file

@ -0,0 +1,149 @@
extern crate amqp;
extern crate env_logger;
use ofborg::ghevent;
use ofborg::acl;
use serde_json;
use ofborg::message::{Repo, Pr, massrebuildjob};
use ofborg::worker;
use amqp::protocol::basic::{Deliver, BasicProperties};
pub struct EvaluationFilterWorker {
acl: acl::ACL,
}
impl EvaluationFilterWorker {
pub fn new(acl: acl::ACL) -> EvaluationFilterWorker {
return EvaluationFilterWorker {
acl: acl,
};
}
}
impl worker::SimpleWorker for EvaluationFilterWorker {
type J = ghevent::PullRequestEvent;
fn msg_to_job(
&mut self,
_: &Deliver,
_: &BasicProperties,
body: &Vec<u8>,
) -> Result<Self::J, String> {
return match serde_json::from_slice(body) {
Ok(e) => Ok(e),
Err(e) => {
Err(format!(
"Failed to deserialize job {:?}: {:?}",
e,
String::from_utf8(body.clone())
))
}
};
}
fn consumer(&mut self, job: &ghevent::PullRequestEvent) -> worker::Actions {
if !self.acl.is_repo_eligible(&job.repository.full_name) {
info!("Repo not authorized ({})", job.repository.full_name);
return vec![worker::Action::Ack];
}
if job.pull_request.state != ghevent::PullRequestState::Open {
info!("PR is not open ({}#{})", job.repository.full_name, job.number);
return vec![worker::Action::Ack];
}
let interesting: bool = match job.action {
ghevent::PullRequestAction::Opened => true,
ghevent::PullRequestAction::Synchronize => true,
ghevent::PullRequestAction::Reopened => true,
ghevent::PullRequestAction::Edited => {
if let Some(ref changes) = job.changes {
changes.base.is_some()
} else {
false
}
},
_ => false,
};
if interesting {
let repo_msg = Repo {
clone_url: job.repository.clone_url.clone(),
full_name: job.repository.full_name.clone(),
owner: job.repository.owner.login.clone(),
name: job.repository.name.clone(),
};
let pr_msg = Pr {
number: job.number.clone(),
head_sha: job.pull_request.head.sha.clone(),
target_branch: Some(job.pull_request.base.git_ref.clone()),
};
let msg = massrebuildjob::MassRebuildJob {
repo: repo_msg.clone(),
pr: pr_msg.clone(),
};
return vec![
worker::publish_serde_action(
Some("mass-rebuild-check-jobs".to_owned()),
None,
&msg
),
worker::Action::Ack
];
}
return vec![
worker::Action::Ack
];
}
}
#[cfg(test)]
mod tests {
use worker::SimpleWorker;
use super::*;
#[test]
fn changed_base() {
let data = include_str!("../../test-srcs/events/pr-changed-base.json");
let job: ghevent::PullRequestEvent =
serde_json::from_str(&data.to_string())
.expect("Should properly deserialize");
let mut worker = EvaluationFilterWorker::new(acl::ACL::new(
vec!["nixos/nixpkgs".to_owned()],
vec![],
vec![],
));
assert_eq!(
worker.consumer(&job),
vec![
worker::publish_serde_action(
Some("mass-rebuild-check-jobs".to_owned()),
None,
&massrebuildjob::MassRebuildJob {
repo: Repo {
clone_url: String::from("https://github.com/NixOS/nixpkgs.git"),
full_name: String::from("NixOS/nixpkgs"),
owner: String::from("NixOS"),
name: String::from("nixpkgs"),
},
pr: Pr {
number: 33299,
head_sha: String::from("887e8b460a7d45ddb3bbdebe01447b251b3229e8"),
target_branch: Some(String::from("staging")),
},
}
),
worker::Action::Ack,
]
);
}
}

View file

@ -5,3 +5,4 @@ pub mod githubcommentfilter;
pub mod githubcommentposter;
pub mod statscollector;
pub mod log_message_collector;
pub mod evaluationfilter;