Add a typed wrapper to exchange declarations

This commit is contained in:
Graham Christensen 2018-01-30 22:23:21 -05:00
parent c0cab43513
commit 634f2bb51f
No known key found for this signature in database
GPG key ID: ACA1C1D120C83D5C

View file

@ -48,6 +48,111 @@ pub struct ConsumeConfig {
pub arguments: Option<amqp::Table>, pub arguments: Option<amqp::Table>,
} }
pub enum ExchangeType {
Topic,
Headers,
Fanout,
Direct,
Custom(String),
}
impl Into<String> for ExchangeType {
fn into(self) -> String {
match self {
ExchangeType::Topic => "topic".to_owned(),
ExchangeType::Headers => "headers".to_owned(),
ExchangeType::Fanout => "fanout".to_owned(),
ExchangeType::Direct => "direct".to_owned(),
ExchangeType::Custom(x) => x,
}
}
}
pub struct ExchangeConfig {
/// Exchange names starting with "amq." are reserved for
/// pre-declared and standardised exchanges. The client MAY
/// declare an exchange starting with "amq." if the passive option
/// is set, or the exchange already exists. Error code:
/// access-refused
///
/// The exchange name consists of a non-empty sequence of these
/// characters: letters, digits, hyphen, underscore, period, or
/// colon. Error code: precondition-failed
exchange: String,
/// Each exchange belongs to one of a set of exchange types
/// implemented by the server. The exchange types define the
/// functionality of the exchange - i.e. how messages are routed
/// through it. It is not valid or meaningful to attempt to change
/// the type of an existing exchange.
///
/// Exchanges cannot be redeclared with different types. The
/// client MUST not attempt to redeclare an existing exchange with
/// a different type than used in the original Exchange.Declare
/// method. Error code: not-allowed
///
/// The client MUST NOT attempt to declare an exchange with a type
/// that the server does not support. Error code: command-invalid
exchange_type: ExchangeType,
/// If set, the server will reply with Declare-Ok if the exchange
/// already exists with the same name, and raise an error if not.
/// The client can use this to check whether an exchange exists
/// without modifying the server state. When set, all other method
/// fields except name and no-wait are ignored. A declare with
/// both passive and no-wait has no effect. Arguments are compared
/// for semantic equivalence.
///
/// If set, and the exchange does not already exist, the server
/// MUST raise a channel exception with reply code 404 (not
/// found).
///
/// If not set and the exchange exists, the server MUST check that
/// the existing exchange has the same values for type, durable,
/// and arguments fields. The server MUST respond with Declare-Ok
/// if the requested exchange matches these fields, and MUST raise
/// a channel exception if not.
passive: bool,
/// If set when creating a new exchange, the exchange will be
/// marked as durable. Durable exchanges remain active when a
/// server restarts. Non-durable exchanges (transient exchanges)
/// are purged if/when a server restarts.
///
/// The server MUST support both durable and transient exchanges.
durable: bool,
/// If set, the exchange is deleted when all queues have finished
/// using it.
///
/// The server SHOULD allow for a reasonable delay between the
/// point when it determines that an exchange is not being used
/// (or no longer used), and the point when it deletes the
/// exchange. At the least it must allow a client to create an
/// exchange and then bind a queue to it, with a small but
/// non-zero delay between these two actions.
///
/// The server MUST ignore the auto-delete field if the exchange
/// already exists.
auto_delete: bool,
/// If set, the exchange may not be used directly by publishers,
/// but only when bound to other exchanges. Internal exchanges are
/// used to construct wiring that is not visible to applications.
internal: bool,
/// If set, the server will not respond to the method. The client
/// should not wait for a reply method. If the server could not
/// complete the method it will raise a channel or connection
/// exception.
nowait: bool,
/// A set of arguments for the declaration. The syntax and
/// semantics of these arguments depends on the server
/// implementation.
arguments: Option<amqp::Table>,
}
pub fn session_from_config(config: &RabbitMQConfig) pub fn session_from_config(config: &RabbitMQConfig)
-> Result<amqp::Session, amqp::AMQPError> { -> Result<amqp::Session, amqp::AMQPError> {
let scheme = if config.ssl { let scheme = if config.ssl {
@ -79,6 +184,11 @@ pub trait TypedWrappers {
fn consume<T>(&mut self, callback: T, config: ConsumeConfig) fn consume<T>(&mut self, callback: T, config: ConsumeConfig)
-> Result<String, amqp::AMQPError> -> Result<String, amqp::AMQPError>
where T: amqp::Consumer + 'static; where T: amqp::Consumer + 'static;
fn declare_exchange<T>(&mut self, config: ExchangeConfig)
-> Result<amqp::protocol::exchange::DeclareOk, amqp::AMQPError>
where T: amqp::Consumer + 'static;
} }
impl TypedWrappers for amqp::Channel { impl TypedWrappers for amqp::Channel {
@ -97,4 +207,21 @@ impl TypedWrappers for amqp::Channel {
config.arguments.unwrap_or(amqp::Table::new()), config.arguments.unwrap_or(amqp::Table::new()),
) )
} }
fn declare_exchange<T>(&mut self, config: ExchangeConfig)
-> Result<amqp::protocol::exchange::DeclareOk, amqp::AMQPError>
where T: amqp::Consumer + 'static
{
self.exchange_declare(
config.exchange,
config.exchange_type.into(),
config.passive,
config.durable,
config.auto_delete,
config.internal,
config.nowait,
config.arguments.unwrap_or(amqp::Table::new())
)
}
} }