From a87cf357d053af40dc794e1e94120dcec58221cc Mon Sep 17 00:00:00 2001 From: Daiderd Jordan Date: Mon, 27 Apr 2020 21:48:14 +0200 Subject: [PATCH 1/4] remove amqp arguments These are not used anywhere and the table is a type specific to the amqp library. --- ofborg/src/bin/builder.rs | 5 ----- ofborg/src/bin/evaluation-filter.rs | 5 ----- ofborg/src/bin/github-comment-filter.rs | 5 ----- ofborg/src/bin/github-comment-poster.rs | 4 ---- ofborg/src/bin/log-message-collector.rs | 4 ---- ofborg/src/bin/mass-rebuilder.rs | 2 -- ofborg/src/bin/stats.rs | 4 ---- ofborg/src/easyamqp.rs | 26 ++++--------------------- 8 files changed, 4 insertions(+), 51 deletions(-) diff --git a/ofborg/src/bin/builder.rs b/ofborg/src/bin/builder.rs index 5f964d1..e55bfaa 100644 --- a/ofborg/src/bin/builder.rs +++ b/ofborg/src/bin/builder.rs @@ -39,7 +39,6 @@ fn main() { auto_delete: false, no_wait: false, internal: false, - arguments: None, }) .unwrap(); @@ -52,7 +51,6 @@ fn main() { exclusive: false, auto_delete: false, no_wait: false, - arguments: None, }) .unwrap() .queue @@ -67,7 +65,6 @@ fn main() { exclusive: true, auto_delete: true, no_wait: false, - arguments: None, }) .unwrap() .queue @@ -79,7 +76,6 @@ fn main() { exchange: "build-jobs".to_owned(), routing_key: None, no_wait: false, - arguments: None, }) .unwrap(); @@ -98,7 +94,6 @@ fn main() { no_ack: false, no_wait: false, exclusive: false, - arguments: None, }, ) .unwrap(); diff --git a/ofborg/src/bin/evaluation-filter.rs b/ofborg/src/bin/evaluation-filter.rs index 73415a3..823fc9e 100644 --- a/ofborg/src/bin/evaluation-filter.rs +++ b/ofborg/src/bin/evaluation-filter.rs @@ -27,7 +27,6 @@ fn main() { auto_delete: false, no_wait: false, internal: false, - arguments: None, }) .unwrap(); @@ -39,7 +38,6 @@ fn main() { exclusive: false, auto_delete: false, no_wait: false, - arguments: None, }) .unwrap(); @@ -51,7 +49,6 @@ fn main() { exclusive: false, auto_delete: false, no_wait: false, - arguments: None, }) .unwrap(); @@ -61,7 +58,6 @@ fn main() { exchange: "github-events".to_owned(), routing_key: Some("pull_request.nixos/nixpkgs".to_owned()), no_wait: false, - arguments: None, }) .unwrap(); @@ -78,7 +74,6 @@ fn main() { no_ack: false, no_wait: false, exclusive: false, - arguments: None, }, ) .unwrap(); diff --git a/ofborg/src/bin/github-comment-filter.rs b/ofborg/src/bin/github-comment-filter.rs index d7d2ebd..2fb2804 100644 --- a/ofborg/src/bin/github-comment-filter.rs +++ b/ofborg/src/bin/github-comment-filter.rs @@ -26,7 +26,6 @@ fn main() { auto_delete: false, no_wait: false, internal: false, - arguments: None, }) .unwrap(); @@ -39,7 +38,6 @@ fn main() { auto_delete: false, no_wait: false, internal: false, - arguments: None, }) .unwrap(); @@ -51,7 +49,6 @@ fn main() { exclusive: false, auto_delete: false, no_wait: false, - arguments: None, }) .unwrap(); @@ -61,7 +58,6 @@ fn main() { exchange: "github-events".to_owned(), routing_key: Some("issue_comment.*".to_owned()), no_wait: false, - arguments: None, }) .unwrap(); @@ -79,7 +75,6 @@ fn main() { no_ack: false, no_wait: false, exclusive: false, - arguments: None, }, ) .unwrap(); diff --git a/ofborg/src/bin/github-comment-poster.rs b/ofborg/src/bin/github-comment-poster.rs index 3faf332..45bca60 100644 --- a/ofborg/src/bin/github-comment-poster.rs +++ b/ofborg/src/bin/github-comment-poster.rs @@ -23,7 +23,6 @@ fn main() { auto_delete: false, no_wait: false, internal: false, - arguments: None, }) .unwrap(); @@ -35,7 +34,6 @@ fn main() { exclusive: false, auto_delete: false, no_wait: false, - arguments: None, }) .unwrap(); @@ -45,7 +43,6 @@ fn main() { exchange: "build-results".to_owned(), routing_key: None, no_wait: false, - arguments: None, }) .unwrap(); @@ -62,7 +59,6 @@ fn main() { no_ack: false, no_wait: false, exclusive: false, - arguments: None, }, ) .unwrap(); diff --git a/ofborg/src/bin/log-message-collector.rs b/ofborg/src/bin/log-message-collector.rs index 6047240..9ea6616 100644 --- a/ofborg/src/bin/log-message-collector.rs +++ b/ofborg/src/bin/log-message-collector.rs @@ -25,7 +25,6 @@ fn main() { auto_delete: false, no_wait: false, internal: false, - arguments: None, }) .unwrap(); @@ -37,7 +36,6 @@ fn main() { exclusive: true, auto_delete: true, no_wait: false, - arguments: None, }) .unwrap() .queue; @@ -48,7 +46,6 @@ fn main() { exchange: "logs".to_owned(), routing_key: Some("*.*".to_owned()), no_wait: false, - arguments: None, }) .unwrap(); @@ -65,7 +62,6 @@ fn main() { no_ack: false, no_wait: false, exclusive: false, - arguments: None, }, ) .unwrap(); diff --git a/ofborg/src/bin/mass-rebuilder.rs b/ofborg/src/bin/mass-rebuilder.rs index f074749..7b14ea6 100644 --- a/ofborg/src/bin/mass-rebuilder.rs +++ b/ofborg/src/bin/mass-rebuilder.rs @@ -61,7 +61,6 @@ fn main() { exclusive: false, auto_delete: false, no_wait: false, - arguments: None, }) .unwrap(); @@ -76,7 +75,6 @@ fn main() { no_ack: false, no_wait: false, exclusive: false, - arguments: None, }, ) .unwrap(); diff --git a/ofborg/src/bin/stats.rs b/ofborg/src/bin/stats.rs index 99bf464..273c08e 100644 --- a/ofborg/src/bin/stats.rs +++ b/ofborg/src/bin/stats.rs @@ -35,7 +35,6 @@ fn main() { auto_delete: false, no_wait: false, internal: false, - arguments: None, }) .unwrap(); @@ -47,7 +46,6 @@ fn main() { exclusive: false, auto_delete: false, no_wait: false, - arguments: None, }) .unwrap(); @@ -57,7 +55,6 @@ fn main() { exchange: "stats".to_owned(), routing_key: None, no_wait: false, - arguments: None, }) .unwrap(); @@ -72,7 +69,6 @@ fn main() { no_ack: false, no_wait: false, exclusive: false, - arguments: None, }, ) .unwrap(); diff --git a/ofborg/src/easyamqp.rs b/ofborg/src/easyamqp.rs index 412d753..02f13a8 100644 --- a/ofborg/src/easyamqp.rs +++ b/ofborg/src/easyamqp.rs @@ -41,10 +41,6 @@ pub struct ConsumeConfig { /// complete the method it will raise a channel or connection /// exception. pub no_wait: bool, - - /// A set of arguments for the consume. The syntax and semantics - /// of these arguments depends on the server implementation. - pub arguments: Option, } pub struct BindQueueConfig { @@ -88,10 +84,6 @@ pub struct BindQueueConfig { /// complete the method it will raise a channel or connection /// exception. pub no_wait: bool, - - /// A set of arguments for the binding. The syntax and semantics - /// of these arguments depends on the exchange class. - pub arguments: Option, } pub enum ExchangeType { @@ -192,11 +184,6 @@ pub struct ExchangeConfig { /// complete the method it will raise a channel or connection /// exception. pub no_wait: bool, - - /// A set of arguments for the declaration. The syntax and - /// semantics of these arguments depends on the server - /// implementation. - pub arguments: Option, } pub struct QueueConfig { @@ -274,11 +261,6 @@ pub struct QueueConfig { /// complete the method it will raise a channel or connection /// exception. pub no_wait: bool, - - /// A set of arguments for the declaration. The syntax and - /// semantics of these arguments depends on the server - /// implementation. - pub arguments: Option, } pub fn session_from_config(config: &RabbitMQConfig) -> Result { @@ -348,7 +330,7 @@ impl TypedWrappers for amqp::Channel { config.no_ack, config.exclusive, config.no_wait, - config.arguments.unwrap_or_else(amqp::Table::new), + amqp::Table::new(), ) } @@ -364,7 +346,7 @@ impl TypedWrappers for amqp::Channel { config.auto_delete, config.internal, config.no_wait, - config.arguments.unwrap_or_else(amqp::Table::new), + amqp::Table::new(), ) } @@ -379,7 +361,7 @@ impl TypedWrappers for amqp::Channel { config.exclusive, config.auto_delete, config.no_wait, - config.arguments.unwrap_or_else(amqp::Table::new), + amqp::Table::new(), ) } @@ -392,7 +374,7 @@ impl TypedWrappers for amqp::Channel { config.exchange, config.routing_key.unwrap_or_else(|| "".to_owned()), config.no_wait, - config.arguments.unwrap_or_else(amqp::Table::new), + amqp::Table::new(), ) } } From a34a4accaf1c1ed7fab926daddb7e83b98e5cf4d Mon Sep 17 00:00:00 2001 From: Daiderd Jordan Date: Mon, 27 Apr 2020 22:12:31 +0200 Subject: [PATCH 2/4] make amqp error type generic --- ofborg/src/easyamqp.rs | 20 ++++++++++++-------- 1 file changed, 12 insertions(+), 8 deletions(-) diff --git a/ofborg/src/easyamqp.rs b/ofborg/src/easyamqp.rs index 02f13a8..23b616b 100644 --- a/ofborg/src/easyamqp.rs +++ b/ofborg/src/easyamqp.rs @@ -297,28 +297,32 @@ pub fn session_from_config(config: &RabbitMQConfig) -> Result(&mut self, callback: T, config: ConsumeConfig) -> Result + type Error; + + fn consume(&mut self, callback: T, config: ConsumeConfig) -> Result where T: amqp::Consumer + 'static; fn declare_exchange( &mut self, config: ExchangeConfig, - ) -> Result; + ) -> Result; fn declare_queue( &mut self, config: QueueConfig, - ) -> Result; + ) -> Result; fn bind_queue( &mut self, config: BindQueueConfig, - ) -> Result; + ) -> Result; } impl TypedWrappers for amqp::Channel { - fn consume(&mut self, callback: T, config: ConsumeConfig) -> Result + type Error = amqp::AMQPError; + + fn consume(&mut self, callback: T, config: ConsumeConfig) -> Result where T: amqp::Consumer + 'static, { @@ -337,7 +341,7 @@ impl TypedWrappers for amqp::Channel { fn declare_exchange( &mut self, config: ExchangeConfig, - ) -> Result { + ) -> Result { self.exchange_declare( config.exchange, config.exchange_type.into(), @@ -353,7 +357,7 @@ impl TypedWrappers for amqp::Channel { fn declare_queue( &mut self, config: QueueConfig, - ) -> Result { + ) -> Result { self.queue_declare( config.queue, config.passive, @@ -368,7 +372,7 @@ impl TypedWrappers for amqp::Channel { fn bind_queue( &mut self, config: BindQueueConfig, - ) -> Result { + ) -> Result { self.queue_bind( config.queue, config.exchange, From 98723462a098cc4e1795272da8f2844944b7f110 Mon Sep 17 00:00:00 2001 From: Daiderd Jordan Date: Mon, 27 Apr 2020 22:21:25 +0200 Subject: [PATCH 3/4] remove amqp return types --- ofborg/src/bin/builder.rs | 16 +++++++------- ofborg/src/bin/log-message-collector.rs | 8 +++---- ofborg/src/easyamqp.rs | 28 ++++++++++++++----------- 3 files changed, 29 insertions(+), 23 deletions(-) diff --git a/ofborg/src/bin/builder.rs b/ofborg/src/bin/builder.rs index e55bfaa..8fff50f 100644 --- a/ofborg/src/bin/builder.rs +++ b/ofborg/src/bin/builder.rs @@ -42,32 +42,34 @@ fn main() { }) .unwrap(); - let queue_name: String = if cfg.runner.build_all_jobs != Some(true) { + let queue_name = if cfg.runner.build_all_jobs != Some(true) { + let queue_name = format!("build-inputs-{}", cfg.nix.system.clone()); channel .declare_queue(easyamqp::QueueConfig { - queue: format!("build-inputs-{}", cfg.nix.system.clone()), + queue: queue_name.clone(), passive: false, durable: true, exclusive: false, auto_delete: false, no_wait: false, }) - .unwrap() - .queue + .unwrap(); + queue_name } else { warn!("Building all jobs, please don't use this unless you're"); warn!("developing and have Graham's permission!"); + let queue_name = "".to_owned(); channel .declare_queue(easyamqp::QueueConfig { - queue: "".to_owned(), + queue: queue_name.clone(), passive: false, durable: false, exclusive: true, auto_delete: true, no_wait: false, }) - .unwrap() - .queue + .unwrap(); + queue_name }; channel diff --git a/ofborg/src/bin/log-message-collector.rs b/ofborg/src/bin/log-message-collector.rs index 9ea6616..4ff5958 100644 --- a/ofborg/src/bin/log-message-collector.rs +++ b/ofborg/src/bin/log-message-collector.rs @@ -28,17 +28,17 @@ fn main() { }) .unwrap(); - let queue_name = channel + let queue_name = "".to_owned(); + channel .declare_queue(easyamqp::QueueConfig { - queue: "".to_owned(), + queue: queue_name.clone(), passive: false, durable: false, exclusive: true, auto_delete: true, no_wait: false, }) - .unwrap() - .queue; + .unwrap(); channel .bind_queue(easyamqp::BindQueueConfig { diff --git a/ofborg/src/easyamqp.rs b/ofborg/src/easyamqp.rs index 23b616b..b059a82 100644 --- a/ofborg/src/easyamqp.rs +++ b/ofborg/src/easyamqp.rs @@ -299,30 +299,30 @@ pub fn session_from_config(config: &RabbitMQConfig) -> Result(&mut self, callback: T, config: ConsumeConfig) -> Result + fn consume(&mut self, callback: T, config: ConsumeConfig) -> Result<(), Self::Error> where T: amqp::Consumer + 'static; fn declare_exchange( &mut self, config: ExchangeConfig, - ) -> Result; + ) -> Result<(), Self::Error>; fn declare_queue( &mut self, config: QueueConfig, - ) -> Result; + ) -> Result<(), Self::Error>; fn bind_queue( &mut self, config: BindQueueConfig, - ) -> Result; + ) -> Result<(), Self::Error>; } impl TypedWrappers for amqp::Channel { type Error = amqp::AMQPError; - fn consume(&mut self, callback: T, config: ConsumeConfig) -> Result + fn consume(&mut self, callback: T, config: ConsumeConfig) -> Result<(), Self::Error> where T: amqp::Consumer + 'static, { @@ -335,13 +335,14 @@ impl TypedWrappers for amqp::Channel { config.exclusive, config.no_wait, amqp::Table::new(), - ) + )?; + Ok(()) } fn declare_exchange( &mut self, config: ExchangeConfig, - ) -> Result { + ) -> Result<(), Self::Error> { self.exchange_declare( config.exchange, config.exchange_type.into(), @@ -351,13 +352,14 @@ impl TypedWrappers for amqp::Channel { config.internal, config.no_wait, amqp::Table::new(), - ) + )?; + Ok(()) } fn declare_queue( &mut self, config: QueueConfig, - ) -> Result { + ) -> Result<(), Self::Error> { self.queue_declare( config.queue, config.passive, @@ -366,19 +368,21 @@ impl TypedWrappers for amqp::Channel { config.auto_delete, config.no_wait, amqp::Table::new(), - ) + )?; + Ok(()) } fn bind_queue( &mut self, config: BindQueueConfig, - ) -> Result { + ) -> Result<(), Self::Error> { self.queue_bind( config.queue, config.exchange, config.routing_key.unwrap_or_else(|| "".to_owned()), config.no_wait, amqp::Table::new(), - ) + )?; + Ok(()) } } From 5ad80e878bef2f84ff78bb2e6464c66a9d97b37c Mon Sep 17 00:00:00 2001 From: Daiderd Jordan Date: Mon, 27 Apr 2020 22:39:03 +0200 Subject: [PATCH 4/4] make amqp consume generic and split traits This removes the final library specfic type from the traits. --- ofborg/src/bin/builder.rs | 2 +- ofborg/src/bin/evaluation-filter.rs | 2 +- ofborg/src/bin/github-comment-filter.rs | 2 +- ofborg/src/bin/github-comment-poster.rs | 2 +- ofborg/src/bin/log-message-collector.rs | 2 +- ofborg/src/bin/mass-rebuilder.rs | 2 +- ofborg/src/bin/stats.rs | 2 +- ofborg/src/easyamqp.rs | 81 ++++++++++--------------- 8 files changed, 38 insertions(+), 57 deletions(-) diff --git a/ofborg/src/bin/builder.rs b/ofborg/src/bin/builder.rs index 8fff50f..ddf89b5 100644 --- a/ofborg/src/bin/builder.rs +++ b/ofborg/src/bin/builder.rs @@ -2,7 +2,7 @@ use amqp::Basic; use log::{info, log, warn}; use ofborg::checkout; use ofborg::config; -use ofborg::easyamqp::{self, TypedWrappers}; +use ofborg::easyamqp::{self, ChannelExt, ConsumerExt}; use ofborg::notifyworker; use ofborg::tasks; diff --git a/ofborg/src/bin/evaluation-filter.rs b/ofborg/src/bin/evaluation-filter.rs index 823fc9e..e21f885 100644 --- a/ofborg/src/bin/evaluation-filter.rs +++ b/ofborg/src/bin/evaluation-filter.rs @@ -1,7 +1,7 @@ use amqp::Basic; use log::{info, log}; use ofborg::config; -use ofborg::easyamqp::{self, TypedWrappers}; +use ofborg::easyamqp::{self, ChannelExt, ConsumerExt}; use ofborg::tasks; use ofborg::worker; diff --git a/ofborg/src/bin/github-comment-filter.rs b/ofborg/src/bin/github-comment-filter.rs index 2fb2804..3ee0163 100644 --- a/ofborg/src/bin/github-comment-filter.rs +++ b/ofborg/src/bin/github-comment-filter.rs @@ -1,7 +1,7 @@ use amqp::Basic; use log::{info, log}; use ofborg::config; -use ofborg::easyamqp::{self, TypedWrappers}; +use ofborg::easyamqp::{self, ChannelExt, ConsumerExt}; use ofborg::tasks; use ofborg::worker; diff --git a/ofborg/src/bin/github-comment-poster.rs b/ofborg/src/bin/github-comment-poster.rs index 45bca60..7a723b1 100644 --- a/ofborg/src/bin/github-comment-poster.rs +++ b/ofborg/src/bin/github-comment-poster.rs @@ -1,7 +1,7 @@ use amqp::Basic; use log::{info, log}; use ofborg::config; -use ofborg::easyamqp::{self, TypedWrappers}; +use ofborg::easyamqp::{self, ChannelExt, ConsumerExt}; use ofborg::tasks; use ofborg::worker; diff --git a/ofborg/src/bin/log-message-collector.rs b/ofborg/src/bin/log-message-collector.rs index 4ff5958..f7513b9 100644 --- a/ofborg/src/bin/log-message-collector.rs +++ b/ofborg/src/bin/log-message-collector.rs @@ -1,6 +1,6 @@ use log::{info, log}; use ofborg::config; -use ofborg::easyamqp::{self, TypedWrappers}; +use ofborg::easyamqp::{self, ChannelExt, ConsumerExt}; use ofborg::tasks; use ofborg::worker; diff --git a/ofborg/src/bin/mass-rebuilder.rs b/ofborg/src/bin/mass-rebuilder.rs index 7b14ea6..eb310db 100644 --- a/ofborg/src/bin/mass-rebuilder.rs +++ b/ofborg/src/bin/mass-rebuilder.rs @@ -2,7 +2,7 @@ use amqp::Basic; use log::{error, info, log}; use ofborg::checkout; use ofborg::config; -use ofborg::easyamqp::{self, TypedWrappers}; +use ofborg::easyamqp::{self, ChannelExt, ConsumerExt}; use ofborg::stats; use ofborg::tasks; use ofborg::worker; diff --git a/ofborg/src/bin/stats.rs b/ofborg/src/bin/stats.rs index 273c08e..3f147ad 100644 --- a/ofborg/src/bin/stats.rs +++ b/ofborg/src/bin/stats.rs @@ -1,7 +1,7 @@ use amqp::Basic; use hyper::server::{Request, Response, Server}; use log::{info, log}; -use ofborg::easyamqp::TypedWrappers; +use ofborg::easyamqp::{ChannelExt, ConsumerExt}; use ofborg::{config, easyamqp, stats, tasks, worker}; use std::env; diff --git a/ofborg/src/easyamqp.rs b/ofborg/src/easyamqp.rs index b059a82..d80eeec 100644 --- a/ofborg/src/easyamqp.rs +++ b/ofborg/src/easyamqp.rs @@ -296,53 +296,22 @@ pub fn session_from_config(config: &RabbitMQConfig) -> Result(&mut self, callback: T, config: ConsumeConfig) -> Result<(), Self::Error> - where - T: amqp::Consumer + 'static; - - fn declare_exchange( - &mut self, - config: ExchangeConfig, - ) -> Result<(), Self::Error>; - - fn declare_queue( - &mut self, - config: QueueConfig, - ) -> Result<(), Self::Error>; - - fn bind_queue( - &mut self, - config: BindQueueConfig, - ) -> Result<(), Self::Error>; + fn declare_exchange(&mut self, config: ExchangeConfig) -> Result<(), Self::Error>; + fn declare_queue(&mut self, config: QueueConfig) -> Result<(), Self::Error>; + fn bind_queue(&mut self, config: BindQueueConfig) -> Result<(), Self::Error>; } -impl TypedWrappers for amqp::Channel { +pub trait ConsumerExt { + type Error; + fn consume(&mut self, callback: T, config: ConsumeConfig) -> Result<(), Self::Error>; +} + +impl ChannelExt for amqp::Channel { type Error = amqp::AMQPError; - fn consume(&mut self, callback: T, config: ConsumeConfig) -> Result<(), Self::Error> - where - T: amqp::Consumer + 'static, - { - self.basic_consume( - callback, - config.queue, - config.consumer_tag, - config.no_local, - config.no_ack, - config.exclusive, - config.no_wait, - amqp::Table::new(), - )?; - Ok(()) - } - - fn declare_exchange( - &mut self, - config: ExchangeConfig, - ) -> Result<(), Self::Error> { + fn declare_exchange(&mut self, config: ExchangeConfig) -> Result<(), Self::Error> { self.exchange_declare( config.exchange, config.exchange_type.into(), @@ -356,10 +325,7 @@ impl TypedWrappers for amqp::Channel { Ok(()) } - fn declare_queue( - &mut self, - config: QueueConfig, - ) -> Result<(), Self::Error> { + fn declare_queue(&mut self, config: QueueConfig) -> Result<(), Self::Error> { self.queue_declare( config.queue, config.passive, @@ -372,10 +338,7 @@ impl TypedWrappers for amqp::Channel { Ok(()) } - fn bind_queue( - &mut self, - config: BindQueueConfig, - ) -> Result<(), Self::Error> { + fn bind_queue(&mut self, config: BindQueueConfig) -> Result<(), Self::Error> { self.queue_bind( config.queue, config.exchange, @@ -386,3 +349,21 @@ impl TypedWrappers for amqp::Channel { Ok(()) } } + +impl ConsumerExt for amqp::Channel { + type Error = amqp::AMQPError; + + fn consume(&mut self, callback: T, config: ConsumeConfig) -> Result<(), Self::Error> { + self.basic_consume( + callback, + config.queue, + config.consumer_tag, + config.no_local, + config.no_ack, + config.exclusive, + config.no_wait, + amqp::Table::new(), + )?; + Ok(()) + } +}