Merge pull request #473 from LnL7/generic-easyamqp

generic easyamqp
This commit is contained in:
Graham Christensen 2020-04-27 16:51:27 -04:00 committed by GitHub
commit ed4fcea227
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
8 changed files with 62 additions and 118 deletions

View file

@ -2,7 +2,7 @@ use amqp::Basic;
use log::{info, log, warn}; use log::{info, log, warn};
use ofborg::checkout; use ofborg::checkout;
use ofborg::config; use ofborg::config;
use ofborg::easyamqp::{self, TypedWrappers}; use ofborg::easyamqp::{self, ChannelExt, ConsumerExt};
use ofborg::notifyworker; use ofborg::notifyworker;
use ofborg::tasks; use ofborg::tasks;
@ -39,38 +39,37 @@ fn main() {
auto_delete: false, auto_delete: false,
no_wait: false, no_wait: false,
internal: false, internal: false,
arguments: None,
}) })
.unwrap(); .unwrap();
let queue_name: String = if cfg.runner.build_all_jobs != Some(true) { let queue_name = if cfg.runner.build_all_jobs != Some(true) {
let queue_name = format!("build-inputs-{}", cfg.nix.system.clone());
channel channel
.declare_queue(easyamqp::QueueConfig { .declare_queue(easyamqp::QueueConfig {
queue: format!("build-inputs-{}", cfg.nix.system.clone()), queue: queue_name.clone(),
passive: false, passive: false,
durable: true, durable: true,
exclusive: false, exclusive: false,
auto_delete: false, auto_delete: false,
no_wait: false, no_wait: false,
arguments: None,
}) })
.unwrap() .unwrap();
.queue queue_name
} else { } else {
warn!("Building all jobs, please don't use this unless you're"); warn!("Building all jobs, please don't use this unless you're");
warn!("developing and have Graham's permission!"); warn!("developing and have Graham's permission!");
let queue_name = "".to_owned();
channel channel
.declare_queue(easyamqp::QueueConfig { .declare_queue(easyamqp::QueueConfig {
queue: "".to_owned(), queue: queue_name.clone(),
passive: false, passive: false,
durable: false, durable: false,
exclusive: true, exclusive: true,
auto_delete: true, auto_delete: true,
no_wait: false, no_wait: false,
arguments: None,
}) })
.unwrap() .unwrap();
.queue queue_name
}; };
channel channel
@ -79,7 +78,6 @@ fn main() {
exchange: "build-jobs".to_owned(), exchange: "build-jobs".to_owned(),
routing_key: None, routing_key: None,
no_wait: false, no_wait: false,
arguments: None,
}) })
.unwrap(); .unwrap();
@ -98,7 +96,6 @@ fn main() {
no_ack: false, no_ack: false,
no_wait: false, no_wait: false,
exclusive: false, exclusive: false,
arguments: None,
}, },
) )
.unwrap(); .unwrap();

View file

@ -1,7 +1,7 @@
use amqp::Basic; use amqp::Basic;
use log::{info, log}; use log::{info, log};
use ofborg::config; use ofborg::config;
use ofborg::easyamqp::{self, TypedWrappers}; use ofborg::easyamqp::{self, ChannelExt, ConsumerExt};
use ofborg::tasks; use ofborg::tasks;
use ofborg::worker; use ofborg::worker;
@ -27,7 +27,6 @@ fn main() {
auto_delete: false, auto_delete: false,
no_wait: false, no_wait: false,
internal: false, internal: false,
arguments: None,
}) })
.unwrap(); .unwrap();
@ -39,7 +38,6 @@ fn main() {
exclusive: false, exclusive: false,
auto_delete: false, auto_delete: false,
no_wait: false, no_wait: false,
arguments: None,
}) })
.unwrap(); .unwrap();
@ -51,7 +49,6 @@ fn main() {
exclusive: false, exclusive: false,
auto_delete: false, auto_delete: false,
no_wait: false, no_wait: false,
arguments: None,
}) })
.unwrap(); .unwrap();
@ -61,7 +58,6 @@ fn main() {
exchange: "github-events".to_owned(), exchange: "github-events".to_owned(),
routing_key: Some("pull_request.nixos/nixpkgs".to_owned()), routing_key: Some("pull_request.nixos/nixpkgs".to_owned()),
no_wait: false, no_wait: false,
arguments: None,
}) })
.unwrap(); .unwrap();
@ -78,7 +74,6 @@ fn main() {
no_ack: false, no_ack: false,
no_wait: false, no_wait: false,
exclusive: false, exclusive: false,
arguments: None,
}, },
) )
.unwrap(); .unwrap();

View file

@ -1,7 +1,7 @@
use amqp::Basic; use amqp::Basic;
use log::{info, log}; use log::{info, log};
use ofborg::config; use ofborg::config;
use ofborg::easyamqp::{self, TypedWrappers}; use ofborg::easyamqp::{self, ChannelExt, ConsumerExt};
use ofborg::tasks; use ofborg::tasks;
use ofborg::worker; use ofborg::worker;
@ -26,7 +26,6 @@ fn main() {
auto_delete: false, auto_delete: false,
no_wait: false, no_wait: false,
internal: false, internal: false,
arguments: None,
}) })
.unwrap(); .unwrap();
@ -39,7 +38,6 @@ fn main() {
auto_delete: false, auto_delete: false,
no_wait: false, no_wait: false,
internal: false, internal: false,
arguments: None,
}) })
.unwrap(); .unwrap();
@ -51,7 +49,6 @@ fn main() {
exclusive: false, exclusive: false,
auto_delete: false, auto_delete: false,
no_wait: false, no_wait: false,
arguments: None,
}) })
.unwrap(); .unwrap();
@ -61,7 +58,6 @@ fn main() {
exchange: "github-events".to_owned(), exchange: "github-events".to_owned(),
routing_key: Some("issue_comment.*".to_owned()), routing_key: Some("issue_comment.*".to_owned()),
no_wait: false, no_wait: false,
arguments: None,
}) })
.unwrap(); .unwrap();
@ -79,7 +75,6 @@ fn main() {
no_ack: false, no_ack: false,
no_wait: false, no_wait: false,
exclusive: false, exclusive: false,
arguments: None,
}, },
) )
.unwrap(); .unwrap();

View file

@ -1,7 +1,7 @@
use amqp::Basic; use amqp::Basic;
use log::{info, log}; use log::{info, log};
use ofborg::config; use ofborg::config;
use ofborg::easyamqp::{self, TypedWrappers}; use ofborg::easyamqp::{self, ChannelExt, ConsumerExt};
use ofborg::tasks; use ofborg::tasks;
use ofborg::worker; use ofborg::worker;
@ -23,7 +23,6 @@ fn main() {
auto_delete: false, auto_delete: false,
no_wait: false, no_wait: false,
internal: false, internal: false,
arguments: None,
}) })
.unwrap(); .unwrap();
@ -35,7 +34,6 @@ fn main() {
exclusive: false, exclusive: false,
auto_delete: false, auto_delete: false,
no_wait: false, no_wait: false,
arguments: None,
}) })
.unwrap(); .unwrap();
@ -45,7 +43,6 @@ fn main() {
exchange: "build-results".to_owned(), exchange: "build-results".to_owned(),
routing_key: None, routing_key: None,
no_wait: false, no_wait: false,
arguments: None,
}) })
.unwrap(); .unwrap();
@ -62,7 +59,6 @@ fn main() {
no_ack: false, no_ack: false,
no_wait: false, no_wait: false,
exclusive: false, exclusive: false,
arguments: None,
}, },
) )
.unwrap(); .unwrap();

View file

@ -1,6 +1,6 @@
use log::{info, log}; use log::{info, log};
use ofborg::config; use ofborg::config;
use ofborg::easyamqp::{self, TypedWrappers}; use ofborg::easyamqp::{self, ChannelExt, ConsumerExt};
use ofborg::tasks; use ofborg::tasks;
use ofborg::worker; use ofborg::worker;
@ -25,22 +25,20 @@ fn main() {
auto_delete: false, auto_delete: false,
no_wait: false, no_wait: false,
internal: false, internal: false,
arguments: None,
}) })
.unwrap(); .unwrap();
let queue_name = channel let queue_name = "".to_owned();
channel
.declare_queue(easyamqp::QueueConfig { .declare_queue(easyamqp::QueueConfig {
queue: "".to_owned(), queue: queue_name.clone(),
passive: false, passive: false,
durable: false, durable: false,
exclusive: true, exclusive: true,
auto_delete: true, auto_delete: true,
no_wait: false, no_wait: false,
arguments: None,
}) })
.unwrap() .unwrap();
.queue;
channel channel
.bind_queue(easyamqp::BindQueueConfig { .bind_queue(easyamqp::BindQueueConfig {
@ -48,7 +46,6 @@ fn main() {
exchange: "logs".to_owned(), exchange: "logs".to_owned(),
routing_key: Some("*.*".to_owned()), routing_key: Some("*.*".to_owned()),
no_wait: false, no_wait: false,
arguments: None,
}) })
.unwrap(); .unwrap();
@ -65,7 +62,6 @@ fn main() {
no_ack: false, no_ack: false,
no_wait: false, no_wait: false,
exclusive: false, exclusive: false,
arguments: None,
}, },
) )
.unwrap(); .unwrap();

View file

@ -2,7 +2,7 @@ use amqp::Basic;
use log::{error, info, log}; use log::{error, info, log};
use ofborg::checkout; use ofborg::checkout;
use ofborg::config; use ofborg::config;
use ofborg::easyamqp::{self, TypedWrappers}; use ofborg::easyamqp::{self, ChannelExt, ConsumerExt};
use ofborg::stats; use ofborg::stats;
use ofborg::tasks; use ofborg::tasks;
use ofborg::worker; use ofborg::worker;
@ -61,7 +61,6 @@ fn main() {
exclusive: false, exclusive: false,
auto_delete: false, auto_delete: false,
no_wait: false, no_wait: false,
arguments: None,
}) })
.unwrap(); .unwrap();
@ -76,7 +75,6 @@ fn main() {
no_ack: false, no_ack: false,
no_wait: false, no_wait: false,
exclusive: false, exclusive: false,
arguments: None,
}, },
) )
.unwrap(); .unwrap();

View file

@ -1,7 +1,7 @@
use amqp::Basic; use amqp::Basic;
use hyper::server::{Request, Response, Server}; use hyper::server::{Request, Response, Server};
use log::{info, log}; use log::{info, log};
use ofborg::easyamqp::TypedWrappers; use ofborg::easyamqp::{ChannelExt, ConsumerExt};
use ofborg::{config, easyamqp, stats, tasks, worker}; use ofborg::{config, easyamqp, stats, tasks, worker};
use std::env; use std::env;
@ -35,7 +35,6 @@ fn main() {
auto_delete: false, auto_delete: false,
no_wait: false, no_wait: false,
internal: false, internal: false,
arguments: None,
}) })
.unwrap(); .unwrap();
@ -47,7 +46,6 @@ fn main() {
exclusive: false, exclusive: false,
auto_delete: false, auto_delete: false,
no_wait: false, no_wait: false,
arguments: None,
}) })
.unwrap(); .unwrap();
@ -57,7 +55,6 @@ fn main() {
exchange: "stats".to_owned(), exchange: "stats".to_owned(),
routing_key: None, routing_key: None,
no_wait: false, no_wait: false,
arguments: None,
}) })
.unwrap(); .unwrap();
@ -72,7 +69,6 @@ fn main() {
no_ack: false, no_ack: false,
no_wait: false, no_wait: false,
exclusive: false, exclusive: false,
arguments: None,
}, },
) )
.unwrap(); .unwrap();

View file

@ -41,10 +41,6 @@ pub struct ConsumeConfig {
/// complete the method it will raise a channel or connection /// complete the method it will raise a channel or connection
/// exception. /// exception.
pub no_wait: bool, pub no_wait: bool,
/// A set of arguments for the consume. The syntax and semantics
/// of these arguments depends on the server implementation.
pub arguments: Option<amqp::Table>,
} }
pub struct BindQueueConfig { pub struct BindQueueConfig {
@ -88,10 +84,6 @@ pub struct BindQueueConfig {
/// complete the method it will raise a channel or connection /// complete the method it will raise a channel or connection
/// exception. /// exception.
pub no_wait: bool, pub no_wait: bool,
/// A set of arguments for the binding. The syntax and semantics
/// of these arguments depends on the exchange class.
pub arguments: Option<amqp::Table>,
} }
pub enum ExchangeType { pub enum ExchangeType {
@ -192,11 +184,6 @@ pub struct ExchangeConfig {
/// complete the method it will raise a channel or connection /// complete the method it will raise a channel or connection
/// exception. /// exception.
pub no_wait: bool, pub no_wait: bool,
/// A set of arguments for the declaration. The syntax and
/// semantics of these arguments depends on the server
/// implementation.
pub arguments: Option<amqp::Table>,
} }
pub struct QueueConfig { pub struct QueueConfig {
@ -274,11 +261,6 @@ pub struct QueueConfig {
/// complete the method it will raise a channel or connection /// complete the method it will raise a channel or connection
/// exception. /// exception.
pub no_wait: bool, pub no_wait: bool,
/// A set of arguments for the declaration. The syntax and
/// semantics of these arguments depends on the server
/// implementation.
pub arguments: Option<amqp::Table>,
} }
pub fn session_from_config(config: &RabbitMQConfig) -> Result<amqp::Session, amqp::AMQPError> { pub fn session_from_config(config: &RabbitMQConfig) -> Result<amqp::Session, amqp::AMQPError> {
@ -314,48 +296,22 @@ pub fn session_from_config(config: &RabbitMQConfig) -> Result<amqp::Session, amq
Ok(session) Ok(session)
} }
pub trait TypedWrappers { pub trait ChannelExt {
fn consume<T>(&mut self, callback: T, config: ConsumeConfig) -> Result<String, amqp::AMQPError> type Error;
where fn declare_exchange(&mut self, config: ExchangeConfig) -> Result<(), Self::Error>;
T: amqp::Consumer + 'static; fn declare_queue(&mut self, config: QueueConfig) -> Result<(), Self::Error>;
fn bind_queue(&mut self, config: BindQueueConfig) -> Result<(), Self::Error>;
fn declare_exchange(
&mut self,
config: ExchangeConfig,
) -> Result<amqp::protocol::exchange::DeclareOk, amqp::AMQPError>;
fn declare_queue(
&mut self,
config: QueueConfig,
) -> Result<amqp::protocol::queue::DeclareOk, amqp::AMQPError>;
fn bind_queue(
&mut self,
config: BindQueueConfig,
) -> Result<amqp::protocol::queue::BindOk, amqp::AMQPError>;
} }
impl TypedWrappers for amqp::Channel { pub trait ConsumerExt<T> {
fn consume<T>(&mut self, callback: T, config: ConsumeConfig) -> Result<String, amqp::AMQPError> type Error;
where fn consume(&mut self, callback: T, config: ConsumeConfig) -> Result<(), Self::Error>;
T: amqp::Consumer + 'static, }
{
self.basic_consume(
callback,
config.queue,
config.consumer_tag,
config.no_local,
config.no_ack,
config.exclusive,
config.no_wait,
config.arguments.unwrap_or_else(amqp::Table::new),
)
}
fn declare_exchange( impl ChannelExt for amqp::Channel {
&mut self, type Error = amqp::AMQPError;
config: ExchangeConfig,
) -> Result<amqp::protocol::exchange::DeclareOk, amqp::AMQPError> { fn declare_exchange(&mut self, config: ExchangeConfig) -> Result<(), Self::Error> {
self.exchange_declare( self.exchange_declare(
config.exchange, config.exchange,
config.exchange_type.into(), config.exchange_type.into(),
@ -364,14 +320,12 @@ impl TypedWrappers for amqp::Channel {
config.auto_delete, config.auto_delete,
config.internal, config.internal,
config.no_wait, config.no_wait,
config.arguments.unwrap_or_else(amqp::Table::new), amqp::Table::new(),
) )?;
Ok(())
} }
fn declare_queue( fn declare_queue(&mut self, config: QueueConfig) -> Result<(), Self::Error> {
&mut self,
config: QueueConfig,
) -> Result<amqp::protocol::queue::DeclareOk, amqp::AMQPError> {
self.queue_declare( self.queue_declare(
config.queue, config.queue,
config.passive, config.passive,
@ -379,20 +333,37 @@ impl TypedWrappers for amqp::Channel {
config.exclusive, config.exclusive,
config.auto_delete, config.auto_delete,
config.no_wait, config.no_wait,
config.arguments.unwrap_or_else(amqp::Table::new), amqp::Table::new(),
) )?;
Ok(())
} }
fn bind_queue( fn bind_queue(&mut self, config: BindQueueConfig) -> Result<(), Self::Error> {
&mut self,
config: BindQueueConfig,
) -> Result<amqp::protocol::queue::BindOk, amqp::AMQPError> {
self.queue_bind( self.queue_bind(
config.queue, config.queue,
config.exchange, config.exchange,
config.routing_key.unwrap_or_else(|| "".to_owned()), config.routing_key.unwrap_or_else(|| "".to_owned()),
config.no_wait, config.no_wait,
config.arguments.unwrap_or_else(amqp::Table::new), amqp::Table::new(),
) )?;
Ok(())
}
}
impl<T: amqp::Consumer + 'static> ConsumerExt<T> for amqp::Channel {
type Error = amqp::AMQPError;
fn consume(&mut self, callback: T, config: ConsumeConfig) -> Result<(), Self::Error> {
self.basic_consume(
callback,
config.queue,
config.consumer_tag,
config.no_local,
config.no_ack,
config.exclusive,
config.no_wait,
amqp::Table::new(),
)?;
Ok(())
} }
} }