Return Actions instead of letting response handlers send messages directly

This commit is contained in:
Graham Christensen 2017-11-09 13:08:00 -05:00
parent b504f7722c
commit 3d0f728d92
No known key found for this signature in database
GPG key ID: ACA1C1D120C83D5C
3 changed files with 47 additions and 24 deletions

View file

@ -3,12 +3,10 @@ extern crate amqp;
use std::collections::LinkedList; use std::collections::LinkedList;
use std::env; use std::env;
use amqp::{Consumer, Channel};
use amqp::protocol::basic::{Deliver,BasicProperties}; use amqp::protocol::basic::{Deliver,BasicProperties};
use std::path::Path; use std::path::Path;
use amqp::Basic; use amqp::Basic;
use amqp::protocol;
use amqp::Session; use amqp::Session;
use amqp::Table; use amqp::Table;
use std::process; use std::process;
@ -80,7 +78,7 @@ impl BuildWorker {
impl worker::SimpleWorker for BuildWorker { impl worker::SimpleWorker for BuildWorker {
type J = buildjob::BuildJob; type J = buildjob::BuildJob;
fn msg_to_job(&self, method: &Deliver, headers: &BasicProperties, fn msg_to_job(&self, _: &Deliver, _: &BasicProperties,
body: &Vec<u8>) -> Result<Self::J, String> { body: &Vec<u8>) -> Result<Self::J, String> {
println!("lmao I got a job?"); println!("lmao I got a job?");
return match buildjob::from(body) { return match buildjob::from(body) {
@ -92,10 +90,10 @@ impl worker::SimpleWorker for BuildWorker {
} }
} }
fn consumer(&self, job: &buildjob::BuildJob) -> Result<(), Error> { fn consumer(&self, job: &buildjob::BuildJob) -> worker::Actions {
let project = self.cloner.project(job.repo.full_name.clone(), job.repo.clone_url.clone()); let project = self.cloner.project(job.repo.full_name.clone(), job.repo.clone_url.clone());
let co = project.clone_for("builder".to_string(), let co = project.clone_for("builder".to_string(),
job.pr.number.to_string())?; job.pr.number.to_string()).unwrap();
let target_branch = match job.pr.target_branch.clone() { let target_branch = match job.pr.target_branch.clone() {
Some(x) => { x } Some(x) => { x }
@ -146,15 +144,11 @@ impl worker::SimpleWorker for BuildWorker {
let last10lines = l10.into_iter().collect::<Vec<_>>(); let last10lines = l10.into_iter().collect::<Vec<_>>();
/*
resp.build_finished(
&job,
&mut channel,
success,
last10lines.clone()
);
*/
return Ok(()) return self.actions().build_finished(
&job,
success,
last10lines.clone()
);
} }
} }

View file

@ -1,7 +1,8 @@
use ofborg::message::{Pr,Repo}; use ofborg::message::{Pr,Repo};
use ofborg::message::buildresult; use ofborg::message::buildresult;
use ofborg::worker;
use serde_json; use serde_json;
use amqp::{Basic, Channel, protocol}; use amqp::{Channel, protocol};
#[derive(Serialize, Deserialize, Debug)] #[derive(Serialize, Deserialize, Debug)]
pub struct BuildJob { pub struct BuildJob {
@ -19,7 +20,7 @@ pub struct Actions {
} }
impl Actions { impl Actions {
pub fn build_finished(&mut self, job: &BuildJob, channel: &mut Channel, success: bool, lines: Vec<String>) { pub fn build_finished(&mut self, job: &BuildJob, success: bool, lines: Vec<String>) -> worker::Actions {
let msg = buildresult::BuildResult { let msg = buildresult::BuildResult {
repo: job.repo.clone(), repo: job.repo.clone(),
pr: job.pr.clone(), pr: job.pr.clone(),
@ -34,8 +35,16 @@ impl Actions {
}; };
return vec![
channel.basic_publish("build-results", "", true, true, worker::Action::Publish(worker::QueueMsg{
props, serde_json::to_string(&msg).unwrap().into_bytes()).unwrap(); exchange: Some("build-results".to_owned()),
routing_key: None,
mandatory: true,
immediate: true,
properties: Some(props),
content: serde_json::to_string(&msg).unwrap().into_bytes()
}),
worker::Action::Ack
];
} }
} }

View file

@ -2,7 +2,7 @@
use amqp::{Consumer, Channel}; use amqp::{Consumer, Channel};
use amqp::protocol::basic::{Deliver,BasicProperties}; use amqp::protocol::basic::{Deliver,BasicProperties};
use std::marker::Send; use std::marker::Send;
use std::io::Error;
pub struct Worker<T: SimpleWorker> { pub struct Worker<T: SimpleWorker> {
internal: T internal: T
@ -11,14 +11,28 @@ pub struct Worker<T: SimpleWorker> {
pub struct Response { pub struct Response {
} }
enum Action { pub type Actions = Vec<Action>;
pub enum Action {
Ack,
Nack,
Publish(QueueMsg),
} }
pub struct QueueMsg {
pub exchange: Option<String>,
pub routing_key: Option<String>,
pub mandatory: bool,
pub immediate: bool,
pub properties: Option<BasicProperties>,
pub content: Vec<u8>,
}
pub trait SimpleWorker { pub trait SimpleWorker {
type J; type J;
fn consumer(&self, job: &Self::J) -> Result<(), Error>; fn consumer(&self, job: &Self::J) -> Actions;
fn msg_to_job(&self, method: &Deliver, headers: &BasicProperties, fn msg_to_job(&self, method: &Deliver, headers: &BasicProperties,
body: &Vec<u8>) -> Result<Self::J, String>; body: &Vec<u8>) -> Result<Self::J, String>;
@ -34,12 +48,18 @@ pub fn new<T: SimpleWorker>(worker: T) -> Worker<T> {
impl <T: SimpleWorker + Send> Consumer for Worker<T> { impl <T: SimpleWorker + Send> Consumer for Worker<T> {
fn handle_delivery(&mut self, fn handle_delivery(&mut self,
channel: &mut Channel, _: &mut Channel,
method: Deliver, method: Deliver,
headers: BasicProperties, headers: BasicProperties,
body: Vec<u8>) { body: Vec<u8>) {
let job = self.internal.msg_to_job(&method, &headers, &body).unwrap(); let job = self.internal.msg_to_job(&method, &headers, &body).unwrap();
self.internal.consumer(&job).unwrap(); for action in self.internal.consumer(&job) {
match action {
Action::Ack => {}
Action::Nack => {}
Action::Publish(_) => {}
}
}
} }
} }