diff --git a/ofborg/src/bin/builder.rs b/ofborg/src/bin/builder.rs index 5f964d1..ddf89b5 100644 --- a/ofborg/src/bin/builder.rs +++ b/ofborg/src/bin/builder.rs @@ -2,7 +2,7 @@ use amqp::Basic; use log::{info, log, warn}; use ofborg::checkout; use ofborg::config; -use ofborg::easyamqp::{self, TypedWrappers}; +use ofborg::easyamqp::{self, ChannelExt, ConsumerExt}; use ofborg::notifyworker; use ofborg::tasks; @@ -39,38 +39,37 @@ fn main() { auto_delete: false, no_wait: false, internal: false, - arguments: None, }) .unwrap(); - let queue_name: String = if cfg.runner.build_all_jobs != Some(true) { + let queue_name = if cfg.runner.build_all_jobs != Some(true) { + let queue_name = format!("build-inputs-{}", cfg.nix.system.clone()); channel .declare_queue(easyamqp::QueueConfig { - queue: format!("build-inputs-{}", cfg.nix.system.clone()), + queue: queue_name.clone(), passive: false, durable: true, exclusive: false, auto_delete: false, no_wait: false, - arguments: None, }) - .unwrap() - .queue + .unwrap(); + queue_name } else { warn!("Building all jobs, please don't use this unless you're"); warn!("developing and have Graham's permission!"); + let queue_name = "".to_owned(); channel .declare_queue(easyamqp::QueueConfig { - queue: "".to_owned(), + queue: queue_name.clone(), passive: false, durable: false, exclusive: true, auto_delete: true, no_wait: false, - arguments: None, }) - .unwrap() - .queue + .unwrap(); + queue_name }; channel @@ -79,7 +78,6 @@ fn main() { exchange: "build-jobs".to_owned(), routing_key: None, no_wait: false, - arguments: None, }) .unwrap(); @@ -98,7 +96,6 @@ fn main() { no_ack: false, no_wait: false, exclusive: false, - arguments: None, }, ) .unwrap(); diff --git a/ofborg/src/bin/evaluation-filter.rs b/ofborg/src/bin/evaluation-filter.rs index 73415a3..e21f885 100644 --- a/ofborg/src/bin/evaluation-filter.rs +++ b/ofborg/src/bin/evaluation-filter.rs @@ -1,7 +1,7 @@ use amqp::Basic; use log::{info, log}; use ofborg::config; -use ofborg::easyamqp::{self, TypedWrappers}; +use ofborg::easyamqp::{self, ChannelExt, ConsumerExt}; use ofborg::tasks; use ofborg::worker; @@ -27,7 +27,6 @@ fn main() { auto_delete: false, no_wait: false, internal: false, - arguments: None, }) .unwrap(); @@ -39,7 +38,6 @@ fn main() { exclusive: false, auto_delete: false, no_wait: false, - arguments: None, }) .unwrap(); @@ -51,7 +49,6 @@ fn main() { exclusive: false, auto_delete: false, no_wait: false, - arguments: None, }) .unwrap(); @@ -61,7 +58,6 @@ fn main() { exchange: "github-events".to_owned(), routing_key: Some("pull_request.nixos/nixpkgs".to_owned()), no_wait: false, - arguments: None, }) .unwrap(); @@ -78,7 +74,6 @@ fn main() { no_ack: false, no_wait: false, exclusive: false, - arguments: None, }, ) .unwrap(); diff --git a/ofborg/src/bin/github-comment-filter.rs b/ofborg/src/bin/github-comment-filter.rs index d7d2ebd..3ee0163 100644 --- a/ofborg/src/bin/github-comment-filter.rs +++ b/ofborg/src/bin/github-comment-filter.rs @@ -1,7 +1,7 @@ use amqp::Basic; use log::{info, log}; use ofborg::config; -use ofborg::easyamqp::{self, TypedWrappers}; +use ofborg::easyamqp::{self, ChannelExt, ConsumerExt}; use ofborg::tasks; use ofborg::worker; @@ -26,7 +26,6 @@ fn main() { auto_delete: false, no_wait: false, internal: false, - arguments: None, }) .unwrap(); @@ -39,7 +38,6 @@ fn main() { auto_delete: false, no_wait: false, internal: false, - arguments: None, }) .unwrap(); @@ -51,7 +49,6 @@ fn main() { exclusive: false, auto_delete: false, no_wait: false, - arguments: None, }) .unwrap(); @@ -61,7 +58,6 @@ fn main() { exchange: "github-events".to_owned(), routing_key: Some("issue_comment.*".to_owned()), no_wait: false, - arguments: None, }) .unwrap(); @@ -79,7 +75,6 @@ fn main() { no_ack: false, no_wait: false, exclusive: false, - arguments: None, }, ) .unwrap(); diff --git a/ofborg/src/bin/github-comment-poster.rs b/ofborg/src/bin/github-comment-poster.rs index 3faf332..7a723b1 100644 --- a/ofborg/src/bin/github-comment-poster.rs +++ b/ofborg/src/bin/github-comment-poster.rs @@ -1,7 +1,7 @@ use amqp::Basic; use log::{info, log}; use ofborg::config; -use ofborg::easyamqp::{self, TypedWrappers}; +use ofborg::easyamqp::{self, ChannelExt, ConsumerExt}; use ofborg::tasks; use ofborg::worker; @@ -23,7 +23,6 @@ fn main() { auto_delete: false, no_wait: false, internal: false, - arguments: None, }) .unwrap(); @@ -35,7 +34,6 @@ fn main() { exclusive: false, auto_delete: false, no_wait: false, - arguments: None, }) .unwrap(); @@ -45,7 +43,6 @@ fn main() { exchange: "build-results".to_owned(), routing_key: None, no_wait: false, - arguments: None, }) .unwrap(); @@ -62,7 +59,6 @@ fn main() { no_ack: false, no_wait: false, exclusive: false, - arguments: None, }, ) .unwrap(); diff --git a/ofborg/src/bin/log-message-collector.rs b/ofborg/src/bin/log-message-collector.rs index 6047240..f7513b9 100644 --- a/ofborg/src/bin/log-message-collector.rs +++ b/ofborg/src/bin/log-message-collector.rs @@ -1,6 +1,6 @@ use log::{info, log}; use ofborg::config; -use ofborg::easyamqp::{self, TypedWrappers}; +use ofborg::easyamqp::{self, ChannelExt, ConsumerExt}; use ofborg::tasks; use ofborg::worker; @@ -25,22 +25,20 @@ fn main() { auto_delete: false, no_wait: false, internal: false, - arguments: None, }) .unwrap(); - let queue_name = channel + let queue_name = "".to_owned(); + channel .declare_queue(easyamqp::QueueConfig { - queue: "".to_owned(), + queue: queue_name.clone(), passive: false, durable: false, exclusive: true, auto_delete: true, no_wait: false, - arguments: None, }) - .unwrap() - .queue; + .unwrap(); channel .bind_queue(easyamqp::BindQueueConfig { @@ -48,7 +46,6 @@ fn main() { exchange: "logs".to_owned(), routing_key: Some("*.*".to_owned()), no_wait: false, - arguments: None, }) .unwrap(); @@ -65,7 +62,6 @@ fn main() { no_ack: false, no_wait: false, exclusive: false, - arguments: None, }, ) .unwrap(); diff --git a/ofborg/src/bin/mass-rebuilder.rs b/ofborg/src/bin/mass-rebuilder.rs index f074749..eb310db 100644 --- a/ofborg/src/bin/mass-rebuilder.rs +++ b/ofborg/src/bin/mass-rebuilder.rs @@ -2,7 +2,7 @@ use amqp::Basic; use log::{error, info, log}; use ofborg::checkout; use ofborg::config; -use ofborg::easyamqp::{self, TypedWrappers}; +use ofborg::easyamqp::{self, ChannelExt, ConsumerExt}; use ofborg::stats; use ofborg::tasks; use ofborg::worker; @@ -61,7 +61,6 @@ fn main() { exclusive: false, auto_delete: false, no_wait: false, - arguments: None, }) .unwrap(); @@ -76,7 +75,6 @@ fn main() { no_ack: false, no_wait: false, exclusive: false, - arguments: None, }, ) .unwrap(); diff --git a/ofborg/src/bin/stats.rs b/ofborg/src/bin/stats.rs index 99bf464..3f147ad 100644 --- a/ofborg/src/bin/stats.rs +++ b/ofborg/src/bin/stats.rs @@ -1,7 +1,7 @@ use amqp::Basic; use hyper::server::{Request, Response, Server}; use log::{info, log}; -use ofborg::easyamqp::TypedWrappers; +use ofborg::easyamqp::{ChannelExt, ConsumerExt}; use ofborg::{config, easyamqp, stats, tasks, worker}; use std::env; @@ -35,7 +35,6 @@ fn main() { auto_delete: false, no_wait: false, internal: false, - arguments: None, }) .unwrap(); @@ -47,7 +46,6 @@ fn main() { exclusive: false, auto_delete: false, no_wait: false, - arguments: None, }) .unwrap(); @@ -57,7 +55,6 @@ fn main() { exchange: "stats".to_owned(), routing_key: None, no_wait: false, - arguments: None, }) .unwrap(); @@ -72,7 +69,6 @@ fn main() { no_ack: false, no_wait: false, exclusive: false, - arguments: None, }, ) .unwrap(); diff --git a/ofborg/src/easyamqp.rs b/ofborg/src/easyamqp.rs index 412d753..d80eeec 100644 --- a/ofborg/src/easyamqp.rs +++ b/ofborg/src/easyamqp.rs @@ -41,10 +41,6 @@ pub struct ConsumeConfig { /// complete the method it will raise a channel or connection /// exception. pub no_wait: bool, - - /// A set of arguments for the consume. The syntax and semantics - /// of these arguments depends on the server implementation. - pub arguments: Option, } pub struct BindQueueConfig { @@ -88,10 +84,6 @@ pub struct BindQueueConfig { /// complete the method it will raise a channel or connection /// exception. pub no_wait: bool, - - /// A set of arguments for the binding. The syntax and semantics - /// of these arguments depends on the exchange class. - pub arguments: Option, } pub enum ExchangeType { @@ -192,11 +184,6 @@ pub struct ExchangeConfig { /// complete the method it will raise a channel or connection /// exception. pub no_wait: bool, - - /// A set of arguments for the declaration. The syntax and - /// semantics of these arguments depends on the server - /// implementation. - pub arguments: Option, } pub struct QueueConfig { @@ -274,11 +261,6 @@ pub struct QueueConfig { /// complete the method it will raise a channel or connection /// exception. pub no_wait: bool, - - /// A set of arguments for the declaration. The syntax and - /// semantics of these arguments depends on the server - /// implementation. - pub arguments: Option, } pub fn session_from_config(config: &RabbitMQConfig) -> Result { @@ -314,48 +296,22 @@ pub fn session_from_config(config: &RabbitMQConfig) -> Result(&mut self, callback: T, config: ConsumeConfig) -> Result - where - T: amqp::Consumer + 'static; - - fn declare_exchange( - &mut self, - config: ExchangeConfig, - ) -> Result; - - fn declare_queue( - &mut self, - config: QueueConfig, - ) -> Result; - - fn bind_queue( - &mut self, - config: BindQueueConfig, - ) -> Result; +pub trait ChannelExt { + type Error; + fn declare_exchange(&mut self, config: ExchangeConfig) -> Result<(), Self::Error>; + fn declare_queue(&mut self, config: QueueConfig) -> Result<(), Self::Error>; + fn bind_queue(&mut self, config: BindQueueConfig) -> Result<(), Self::Error>; } -impl TypedWrappers for amqp::Channel { - fn consume(&mut self, callback: T, config: ConsumeConfig) -> Result - where - T: amqp::Consumer + 'static, - { - self.basic_consume( - callback, - config.queue, - config.consumer_tag, - config.no_local, - config.no_ack, - config.exclusive, - config.no_wait, - config.arguments.unwrap_or_else(amqp::Table::new), - ) - } +pub trait ConsumerExt { + type Error; + fn consume(&mut self, callback: T, config: ConsumeConfig) -> Result<(), Self::Error>; +} - fn declare_exchange( - &mut self, - config: ExchangeConfig, - ) -> Result { +impl ChannelExt for amqp::Channel { + type Error = amqp::AMQPError; + + fn declare_exchange(&mut self, config: ExchangeConfig) -> Result<(), Self::Error> { self.exchange_declare( config.exchange, config.exchange_type.into(), @@ -364,14 +320,12 @@ impl TypedWrappers for amqp::Channel { config.auto_delete, config.internal, config.no_wait, - config.arguments.unwrap_or_else(amqp::Table::new), - ) + amqp::Table::new(), + )?; + Ok(()) } - fn declare_queue( - &mut self, - config: QueueConfig, - ) -> Result { + fn declare_queue(&mut self, config: QueueConfig) -> Result<(), Self::Error> { self.queue_declare( config.queue, config.passive, @@ -379,20 +333,37 @@ impl TypedWrappers for amqp::Channel { config.exclusive, config.auto_delete, config.no_wait, - config.arguments.unwrap_or_else(amqp::Table::new), - ) + amqp::Table::new(), + )?; + Ok(()) } - fn bind_queue( - &mut self, - config: BindQueueConfig, - ) -> Result { + fn bind_queue(&mut self, config: BindQueueConfig) -> Result<(), Self::Error> { self.queue_bind( config.queue, config.exchange, config.routing_key.unwrap_or_else(|| "".to_owned()), config.no_wait, - config.arguments.unwrap_or_else(amqp::Table::new), - ) + amqp::Table::new(), + )?; + Ok(()) + } +} + +impl ConsumerExt for amqp::Channel { + type Error = amqp::AMQPError; + + fn consume(&mut self, callback: T, config: ConsumeConfig) -> Result<(), Self::Error> { + self.basic_consume( + callback, + config.queue, + config.consumer_tag, + config.no_local, + config.no_ack, + config.exclusive, + config.no_wait, + amqp::Table::new(), + )?; + Ok(()) } }