convert builder to easylapin

This is probably the safest thing to start with since it can be tested
separately from the main deployment and isn't only used for secondary
checks.
This commit is contained in:
Daiderd Jordan 2020-04-28 22:19:08 +02:00
parent 0ecd8cc74c
commit eae94fe00a
No known key found for this signature in database
GPG key ID: D02435D05B810C96
2 changed files with 51 additions and 62 deletions

View file

@ -1,14 +1,13 @@
use amqp::Basic;
use log::{info, log, warn};
use ofborg::checkout;
use ofborg::config;
use ofborg::easyamqp::{self, ChannelExt, ConsumerExt};
use ofborg::notifyworker;
use ofborg::tasks;
use std::env;
use std::path::Path;
use async_std::task;
use log::{info, log, warn};
use ofborg::easyamqp::{self, ChannelExt, ConsumerExt};
use ofborg::easylapin;
use ofborg::{checkout, config, tasks};
fn main() {
let cfg = config::load(env::args().nth(1).unwrap().as_ref());
@ -27,68 +26,64 @@ fn main() {
panic!();
};
let mut session = easyamqp::session_from_config(&cfg.rabbitmq).unwrap();
let mut channel = session.open_channel(1).unwrap();
channel.basic_prefetch(1).unwrap();
channel
.declare_exchange(easyamqp::ExchangeConfig {
exchange: "build-jobs".to_owned(),
exchange_type: easyamqp::ExchangeType::Fanout,
passive: false,
durable: true,
auto_delete: false,
no_wait: false,
internal: false,
})
.unwrap();
let conn = easylapin::from_config(&cfg.rabbitmq).unwrap();
let mut chan = task::block_on(conn.create_channel()).unwrap();
chan.declare_exchange(easyamqp::ExchangeConfig {
exchange: "build-jobs".to_owned(),
exchange_type: easyamqp::ExchangeType::Fanout,
passive: false,
durable: true,
auto_delete: false,
no_wait: false,
internal: false,
})
.unwrap();
let queue_name = if cfg.runner.build_all_jobs != Some(true) {
let queue_name = format!("build-inputs-{}", cfg.nix.system.clone());
channel
.declare_queue(easyamqp::QueueConfig {
queue: queue_name.clone(),
passive: false,
durable: true,
exclusive: false,
auto_delete: false,
no_wait: false,
})
.unwrap();
chan.declare_queue(easyamqp::QueueConfig {
queue: queue_name.clone(),
passive: false,
durable: true,
exclusive: false,
auto_delete: false,
no_wait: false,
})
.unwrap();
queue_name
} else {
warn!("Building all jobs, please don't use this unless you're");
warn!("developing and have Graham's permission!");
let queue_name = "".to_owned();
channel
.declare_queue(easyamqp::QueueConfig {
queue: queue_name.clone(),
passive: false,
durable: false,
exclusive: true,
auto_delete: true,
no_wait: false,
})
.unwrap();
queue_name
};
channel
.bind_queue(easyamqp::BindQueueConfig {
chan.declare_queue(easyamqp::QueueConfig {
queue: queue_name.clone(),
exchange: "build-jobs".to_owned(),
routing_key: None,
passive: false,
durable: false,
exclusive: true,
auto_delete: true,
no_wait: false,
})
.unwrap();
queue_name
};
let mut channel = channel
chan.bind_queue(easyamqp::BindQueueConfig {
queue: queue_name.clone(),
exchange: "build-jobs".to_owned(),
routing_key: None,
no_wait: false,
})
.unwrap();
let handle = easylapin::NotifyChannel(chan)
.consume(
notifyworker::new(tasks::build::BuildWorker::new(
tasks::build::BuildWorker::new(
cloner,
nix,
cfg.nix.system.clone(),
cfg.runner.identity.clone(),
)),
),
easyamqp::ConsumeConfig {
queue: queue_name.clone(),
consumer_tag: format!("{}-builder", cfg.whoami()),
@ -101,9 +96,8 @@ fn main() {
.unwrap();
info!("Fetching jobs from {}", &queue_name);
channel.start_consuming();
channel.close(200, "Bye").unwrap();
info!("Closed the channel");
session.close(200, "Good Bye");
task::block_on(handle);
drop(conn); // Close connection.
info!("Closed the session... EOF");
}

View file

@ -16,12 +16,7 @@ impl EvaluationFilterWorker {
impl worker::SimpleWorker for EvaluationFilterWorker {
type J = ghevent::PullRequestEvent;
fn msg_to_job(
&mut self,
_: &str,
_: &Option<String>,
body: &[u8],
) -> Result<Self::J, String> {
fn msg_to_job(&mut self, _: &str, _: &Option<String>, body: &[u8]) -> Result<Self::J, String> {
match serde_json::from_slice(body) {
Ok(e) => Ok(e),
Err(e) => Err(format!(