diff --git a/ofborg/src/bin/builder.rs b/ofborg/src/bin/builder.rs index 5acd17d..d621695 100644 --- a/ofborg/src/bin/builder.rs +++ b/ofborg/src/bin/builder.rs @@ -1,14 +1,13 @@ -use amqp::Basic; -use log::{info, log, warn}; -use ofborg::checkout; -use ofborg::config; -use ofborg::easyamqp::{self, ChannelExt, ConsumerExt}; -use ofborg::notifyworker; -use ofborg::tasks; - use std::env; use std::path::Path; +use async_std::task; +use log::{info, log, warn}; + +use ofborg::easyamqp::{self, ChannelExt, ConsumerExt}; +use ofborg::easylapin; +use ofborg::{checkout, config, tasks}; + fn main() { let cfg = config::load(env::args().nth(1).unwrap().as_ref()); @@ -27,68 +26,64 @@ fn main() { panic!(); }; - let mut session = easyamqp::session_from_config(&cfg.rabbitmq).unwrap(); - let mut channel = session.open_channel(1).unwrap(); - channel.basic_prefetch(1).unwrap(); - channel - .declare_exchange(easyamqp::ExchangeConfig { - exchange: "build-jobs".to_owned(), - exchange_type: easyamqp::ExchangeType::Fanout, - passive: false, - durable: true, - auto_delete: false, - no_wait: false, - internal: false, - }) - .unwrap(); + let conn = easylapin::from_config(&cfg.rabbitmq).unwrap(); + let mut chan = task::block_on(conn.create_channel()).unwrap(); + + chan.declare_exchange(easyamqp::ExchangeConfig { + exchange: "build-jobs".to_owned(), + exchange_type: easyamqp::ExchangeType::Fanout, + passive: false, + durable: true, + auto_delete: false, + no_wait: false, + internal: false, + }) + .unwrap(); let queue_name = if cfg.runner.build_all_jobs != Some(true) { let queue_name = format!("build-inputs-{}", cfg.nix.system.clone()); - channel - .declare_queue(easyamqp::QueueConfig { - queue: queue_name.clone(), - passive: false, - durable: true, - exclusive: false, - auto_delete: false, - no_wait: false, - }) - .unwrap(); + chan.declare_queue(easyamqp::QueueConfig { + queue: queue_name.clone(), + passive: false, + durable: true, + exclusive: false, + auto_delete: false, + no_wait: false, + }) + .unwrap(); queue_name } else { warn!("Building all jobs, please don't use this unless you're"); warn!("developing and have Graham's permission!"); let queue_name = "".to_owned(); - channel - .declare_queue(easyamqp::QueueConfig { - queue: queue_name.clone(), - passive: false, - durable: false, - exclusive: true, - auto_delete: true, - no_wait: false, - }) - .unwrap(); - queue_name - }; - - channel - .bind_queue(easyamqp::BindQueueConfig { + chan.declare_queue(easyamqp::QueueConfig { queue: queue_name.clone(), - exchange: "build-jobs".to_owned(), - routing_key: None, + passive: false, + durable: false, + exclusive: true, + auto_delete: true, no_wait: false, }) .unwrap(); + queue_name + }; - let mut channel = channel + chan.bind_queue(easyamqp::BindQueueConfig { + queue: queue_name.clone(), + exchange: "build-jobs".to_owned(), + routing_key: None, + no_wait: false, + }) + .unwrap(); + + let handle = easylapin::NotifyChannel(chan) .consume( - notifyworker::new(tasks::build::BuildWorker::new( + tasks::build::BuildWorker::new( cloner, nix, cfg.nix.system.clone(), cfg.runner.identity.clone(), - )), + ), easyamqp::ConsumeConfig { queue: queue_name.clone(), consumer_tag: format!("{}-builder", cfg.whoami()), @@ -101,9 +96,8 @@ fn main() { .unwrap(); info!("Fetching jobs from {}", &queue_name); - channel.start_consuming(); - channel.close(200, "Bye").unwrap(); - info!("Closed the channel"); - session.close(200, "Good Bye"); + task::block_on(handle); + + drop(conn); // Close connection. info!("Closed the session... EOF"); } diff --git a/ofborg/src/tasks/evaluationfilter.rs b/ofborg/src/tasks/evaluationfilter.rs index 717b414..c8184b6 100644 --- a/ofborg/src/tasks/evaluationfilter.rs +++ b/ofborg/src/tasks/evaluationfilter.rs @@ -16,12 +16,7 @@ impl EvaluationFilterWorker { impl worker::SimpleWorker for EvaluationFilterWorker { type J = ghevent::PullRequestEvent; - fn msg_to_job( - &mut self, - _: &str, - _: &Option, - body: &[u8], - ) -> Result { + fn msg_to_job(&mut self, _: &str, _: &Option, body: &[u8]) -> Result { match serde_json::from_slice(body) { Ok(e) => Ok(e), Err(e) => Err(format!(