From 5e357d5e01b96f8bd740f1d2c9c5ddd884e121f8 Mon Sep 17 00:00:00 2001 From: Graham Christensen Date: Wed, 31 Jan 2018 09:27:11 -0500 Subject: [PATCH] Migrate fully to session_from_config and the typed consume() wrapper --- ofborg/src/bin/build-faker.rs | 6 ++---- ofborg/src/bin/github-comment-filter.rs | 20 ++++++++++---------- ofborg/src/bin/log-message-collector.rs | 23 ++++++++++++----------- ofborg/src/bin/log-message-generator.rs | 4 +--- ofborg/src/bin/mass-rebuilder.rs | 22 +++++++++++----------- 5 files changed, 36 insertions(+), 39 deletions(-) diff --git a/ofborg/src/bin/build-faker.rs b/ofborg/src/bin/build-faker.rs index 2d56cb4..7752db4 100644 --- a/ofborg/src/bin/build-faker.rs +++ b/ofborg/src/bin/build-faker.rs @@ -9,15 +9,13 @@ extern crate hyper_native_tls; use std::env; -use amqp::Session; - use ofborg::config; use ofborg::worker; use ofborg::notifyworker; use ofborg::notifyworker::NotificationReceiver; use ofborg::commentparser; use ofborg::message::buildjob; - +use ofborg::easyamqp; use ofborg::message::{Pr, Repo}; fn main() { @@ -27,7 +25,7 @@ fn main() { println!("Hello, world!"); - let mut session = Session::open_url(&cfg.rabbitmq.as_uri()).unwrap(); + let mut session = easyamqp::session_from_config(&cfg.rabbitmq).unwrap(); println!("Connected to rabbitmq"); diff --git a/ofborg/src/bin/github-comment-filter.rs b/ofborg/src/bin/github-comment-filter.rs index bf31ebe..25763e3 100644 --- a/ofborg/src/bin/github-comment-filter.rs +++ b/ofborg/src/bin/github-comment-filter.rs @@ -10,8 +10,6 @@ extern crate hyper_native_tls; use std::env; use amqp::Basic; -use amqp::Session; -use amqp::Table; use ofborg::config; use ofborg::worker; @@ -34,18 +32,20 @@ fn main() { channel.basic_prefetch(1).unwrap(); channel - .basic_consume( + .consume( worker::new(tasks::githubcommentfilter::GitHubCommentWorker::new( cfg.acl(), cfg.github(), )), - "build-inputs", - format!("{}-github-comment-filter", cfg.whoami()).as_ref(), - false, - false, - false, - false, - Table::new(), + easyamqp::ConsumeConfig { + queue: "build-inputs".to_owned(), + consumer_tag: format!("{}-github-comment-filter", cfg.whoami()), + no_local: false, + no_ack: false, + no_wait: false, + exclusive: false, + arguments: None + }, ) .unwrap(); diff --git a/ofborg/src/bin/log-message-collector.rs b/ofborg/src/bin/log-message-collector.rs index a831ea9..76f8de5 100644 --- a/ofborg/src/bin/log-message-collector.rs +++ b/ofborg/src/bin/log-message-collector.rs @@ -5,13 +5,11 @@ extern crate env_logger; use std::env; use std::path::PathBuf; -use amqp::Session; use amqp::Table; use ofborg::config; use ofborg::worker; use ofborg::tasks; -use amqp::Basic; use ofborg::easyamqp; use ofborg::easyamqp::TypedWrappers; @@ -49,22 +47,25 @@ fn main() { .unwrap(); - channel - .basic_consume( + channel + .consume( worker::new(tasks::log_message_collector::LogMessageCollector::new( PathBuf::from(cfg.log_storage.clone().unwrap().path), 100, )), - queue_name, - format!("{}-log-collector", cfg.whoami()), - false, - false, - false, - false, - Table::new(), + easyamqp::ConsumeConfig { + queue: queue_name, + consumer_tag: format!("{}-log-collector", cfg.whoami()), + no_local: false, + no_ack: false, + no_wait: false, + exclusive: false, + arguments: None + }, ) .unwrap(); + channel.start_consuming(); println!("Finished consuming?"); diff --git a/ofborg/src/bin/log-message-generator.rs b/ofborg/src/bin/log-message-generator.rs index 9eaf6f9..6d93544 100644 --- a/ofborg/src/bin/log-message-generator.rs +++ b/ofborg/src/bin/log-message-generator.rs @@ -6,7 +6,6 @@ use std::env; use std::time::Duration; use std::thread; -use amqp::Session; use ofborg::message::{Pr, Repo}; use ofborg::config; @@ -14,13 +13,12 @@ use ofborg::notifyworker; use ofborg::tasks::build; use ofborg::message::buildjob; use ofborg::easyamqp; -use ofborg::easyamqp::TypedWrappers; fn main() { let cfg = config::load(env::args().nth(1).unwrap().as_ref()); ofborg::setup_log(); - let mut session = Session::open_url(&cfg.rabbitmq.as_uri()).unwrap(); + let mut session = easyamqp::session_from_config(&cfg.rabbitmq).unwrap(); println!("Connected to rabbitmq"); println!("About to open channel #1"); diff --git a/ofborg/src/bin/mass-rebuilder.rs b/ofborg/src/bin/mass-rebuilder.rs index 1b70699..cded68b 100644 --- a/ofborg/src/bin/mass-rebuilder.rs +++ b/ofborg/src/bin/mass-rebuilder.rs @@ -10,8 +10,6 @@ use ofborg::checkout; use ofborg::stats; use ofborg::worker; -use amqp::Session; -use amqp::Table; use amqp::Basic; use ofborg::easyamqp; use ofborg::easyamqp::TypedWrappers; @@ -24,7 +22,7 @@ fn main() { println!("Hello, world!"); - let mut session = Session::open_url(&cfg.rabbitmq.as_uri()).unwrap(); + let mut session = easyamqp::session_from_config(&cfg.rabbitmq).unwrap(); println!("Connected to rabbitmq"); let mut channel = session.open_channel(1).unwrap(); @@ -44,15 +42,17 @@ fn main() { channel.basic_prefetch(1).unwrap(); channel - .basic_consume( + .consume( worker::new(mrw), - "mass-rebuild-check-jobs", - format!("{}-mass-rebuild-checker", cfg.whoami()).as_ref(), - false, - false, - false, - false, - Table::new(), + easyamqp::ConsumeConfig { + queue: "mass-rebuild-check-jobs".to_owned(), + consumer_tag: format!("{}-mass-rebuild-checker", cfg.whoami()), + no_local: false, + no_ack: false, + no_wait: false, + exclusive: false, + arguments: None + }, ) .unwrap();