Move builder logic in to a task

This commit is contained in:
Graham Christensen 2017-11-10 19:53:41 -05:00
parent 72018565dd
commit 83d4132f36
No known key found for this signature in database
GPG key ID: ACA1C1D120C83D5C
6 changed files with 128 additions and 125 deletions

View file

@ -3,25 +3,17 @@ extern crate amqp;
extern crate env_logger;
use std::{thread, time};
use std::collections::LinkedList;
use std::env;
use amqp::protocol::basic::{Deliver,BasicProperties};
use std::path::Path;
use amqp::Basic;
use amqp::Session;
use amqp::Channel;
use amqp::Table;
use std::process;
use std::io::Error;
use std::fs::File;
use std::io::BufReader;
use std::io::BufRead;
use ofborg::config;
use ofborg::checkout;
use ofborg::worker;
use ofborg::message::buildjob;
use ofborg::tasks;
use ofborg::nix;
@ -36,7 +28,7 @@ fn main() {
println!("Connected to rabbitmq");
{
println!("About to open channel #1");
let mut hbchan = session.open_channel(1).unwrap();
let hbchan = session.open_channel(1).unwrap();
println!("Opened channel #1");
@ -52,129 +44,28 @@ fn main() {
}
let cloner = checkout::cached_cloner(Path::new(&cfg.checkout.root));
let nix = nix::new(cfg.nix.system.clone(), cfg.nix.remote);
let nix = nix::new(cfg.nix.system.clone(), cfg.nix.remote.clone());
channel.basic_consume(
worker::new(BuildWorker{
cloner: cloner,
nix: nix,
system: cfg.nix.system.clone()
}),
worker::new(tasks::build::BuildWorker::new(cloner, nix, cfg.nix.system.clone())),
"build-inputs-samples",
"lmao1",
format!("{}-builder", cfg.whoami()).as_ref(),
false,
false,
false,
false,
Table::new()
);
).unwrap();
let ten_sec = time::Duration::from_secs(10);
thread::sleep(ten_sec);
channel.start_consuming();
println!("Finished consuming?");
channel.close(200, "Bye").unwrap();
println!("Closed the channel");
session.close(200, "Good Bye");
println!("Closed the session... EOF");
}
struct BuildWorker {
cloner: checkout::CachedCloner,
nix: nix::Nix,
system: String,
}
impl BuildWorker {
fn actions(&self) -> buildjob::Actions {
return buildjob::Actions{
system: self.system.clone(),
};
}
}
impl worker::SimpleWorker for BuildWorker {
type J = buildjob::BuildJob;
fn msg_to_job(&self, _: &Deliver, _: &BasicProperties,
body: &Vec<u8>) -> Result<Self::J, String> {
println!("lmao I got a job?");
return match buildjob::from(body) {
Ok(e) => { return Ok(e) }
Err(e) => {
println!("{:?}", String::from_utf8(body.clone()));
panic!("{:?}", e);
}
}
}
fn consumer(&self, job: &buildjob::BuildJob) -> worker::Actions {
let project = self.cloner.project(job.repo.full_name.clone(), job.repo.clone_url.clone());
let co = project.clone_for("builder".to_string(),
job.pr.number.to_string()).unwrap();
let target_branch = match job.pr.target_branch.clone() {
Some(x) => { x }
None => { String::from("origin/master") }
};
let refpath = co.checkout_ref(target_branch.as_ref()).unwrap();
co.fetch_pr(job.pr.number).unwrap();
co.merge_commit(job.pr.head_sha.as_ref()).unwrap();
println!("Got path: {:?}, building", refpath);
let success: bool;
let reader: BufReader<File>;
match self.nix.safely_build_attrs(refpath.as_ref(), job.attrs.clone()) {
Ok(r) => {
success = true;
reader = BufReader::new(r);
}
Err(r) => {
success = false;
reader = BufReader::new(r);
}
}
println!("ok built ({:?}), building", success);
let l10 = reader.lines().fold(LinkedList::new(),
|mut coll, line|
{
match line {
Ok(e) => { coll.push_back(e); }
Err(wtf) => {
println!("Got err in lines: {:?}", wtf);
coll.push_back(String::from("<line omitted due to error>"));
}
}
if coll.len() == 11 {
coll.pop_front();
}
return coll
}
);
println!("Lines: {:?}", l10);
let last10lines = l10.into_iter().collect::<Vec<_>>();
return self.actions().build_finished(
&job,
success,
last10lines.clone()
);
}
}

View file

@ -37,14 +37,14 @@ impl Actions {
return vec![
worker::Action::Publish(worker::QueueMsg{
exchange: Some("build-results-x".to_owned()),
exchange: Some("build-results".to_owned()),
routing_key: None,
mandatory: true,
immediate: false,
properties: Some(props),
content: serde_json::to_string(&msg).unwrap().into_bytes()
}),
worker::Action::NackRequeue
worker::Action::Ack
];
}
}

116
ofborg/src/tasks/build.rs Normal file
View file

@ -0,0 +1,116 @@
extern crate amqp;
extern crate env_logger;
use std::collections::LinkedList;
use std::fs::File;
use std::io::BufReader;
use std::io::BufRead;
use ofborg::checkout;
use ofborg::message::buildjob;
use ofborg::nix;
use ofborg::worker;
use amqp::protocol::basic::{Deliver,BasicProperties};
pub struct BuildWorker {
cloner: checkout::CachedCloner,
nix: nix::Nix,
system: String,
}
impl BuildWorker {
pub fn new(cloner: checkout::CachedCloner, nix: nix::Nix, system: String) -> BuildWorker {
return BuildWorker{
cloner: cloner,
nix: nix,
system: system,
};
}
fn actions(&self) -> buildjob::Actions {
return buildjob::Actions{
system: self.system.clone(),
};
}
}
impl worker::SimpleWorker for BuildWorker {
type J = buildjob::BuildJob;
fn msg_to_job(&self, _: &Deliver, _: &BasicProperties,
body: &Vec<u8>) -> Result<Self::J, String> {
println!("lmao I got a job?");
return match buildjob::from(body) {
Ok(e) => { Ok(e) }
Err(e) => {
println!("{:?}", String::from_utf8(body.clone()));
panic!("{:?}", e);
}
}
}
fn consumer(&self, job: &buildjob::BuildJob) -> worker::Actions {
let project = self.cloner.project(job.repo.full_name.clone(), job.repo.clone_url.clone());
let co = project.clone_for("builder".to_string(),
job.pr.number.to_string()).unwrap();
let target_branch = match job.pr.target_branch.clone() {
Some(x) => { x }
None => { String::from("origin/master") }
};
let refpath = co.checkout_ref(target_branch.as_ref()).unwrap();
co.fetch_pr(job.pr.number).unwrap();
co.merge_commit(job.pr.head_sha.as_ref()).unwrap();
println!("Got path: {:?}, building", refpath);
let success: bool;
let reader: BufReader<File>;
match self.nix.safely_build_attrs(refpath.as_ref(), job.attrs.clone()) {
Ok(r) => {
success = true;
reader = BufReader::new(r);
}
Err(r) => {
success = false;
reader = BufReader::new(r);
}
}
println!("ok built ({:?}), building", success);
let l10 = reader.lines().fold(LinkedList::new(),
|mut coll, line|
{
match line {
Ok(e) => { coll.push_back(e); }
Err(wtf) => {
println!("Got err in lines: {:?}", wtf);
coll.push_back(String::from("<line omitted due to error>"));
}
}
if coll.len() == 11 {
coll.pop_front();
}
return coll
}
);
println!("Lines: {:?}", l10);
let last10lines = l10.into_iter().collect::<Vec<_>>();
return self.actions().build_finished(
&job,
success,
last10lines.clone()
);
}
}

View file

@ -47,7 +47,7 @@ impl worker::SimpleWorker for PlasticHeartbeatWorker {
return vec![
worker::Action::Publish(self.message()),
worker::Action::NackRequeue
worker::Action::Ack
];
}
}
@ -87,7 +87,7 @@ pub fn start_on_channel(mut hbchan: Channel, consumer_name: String) {
}
),
queue_name,
String::from(consumer_name),
String::from(format!("{}-heartbeat", consumer_name)),
false,
false,
false,

View file

@ -1,2 +1,3 @@
pub mod heartbeat;
pub mod build;

View file

@ -61,32 +61,27 @@ impl <T: SimpleWorker + Send> Consumer for Worker<T> {
for action in self.internal.consumer(&job) {
match action {
Action::Ack => {
println!("Ack");
channel.basic_ack(method.delivery_tag, false).unwrap();
}
Action::NackRequeue => {
println!("Nack Requeue");
channel.basic_nack(method.delivery_tag, false, true).unwrap();
}
Action::NackDump => {
println!("Nack Dump");
channel.basic_nack(method.delivery_tag, false, false).unwrap();
}
Action::Publish(msg) => {
let exch = msg.exchange.clone().unwrap_or("".to_owned());
let key = msg.routing_key.clone().unwrap_or("".to_owned());
println!("Publishing, {:?} -> {:?}: {:?}", exch, key, msg);
let props = msg.properties.unwrap_or(BasicProperties{ ..Default::default()});
println!("{:?}", channel.basic_publish(
channel.basic_publish(
exch,
key,
msg.mandatory,
msg.immediate,
props,
msg.content
).unwrap());
).unwrap();
}
}
}