Add typed wrapper for queue bindings and fix the exchange declaration

This commit is contained in:
Graham Christensen 2018-01-31 18:12:23 -05:00
parent 031156dc30
commit 23cb108a45
No known key found for this signature in database
GPG key ID: ACA1C1D120C83D5C

View file

@ -48,6 +48,54 @@ pub struct ConsumeConfig {
pub arguments: Option<amqp::Table>,
}
pub struct BindQueueConfig {
/// Specifies the name of the queue to bind.
///
/// The client MUST either specify a queue name or have previously
/// declared a queue on the same channel Error code: not-found
///
/// The client MUST NOT attempt to bind a queue that does not
/// exist. Error code: not-found
pub queue: String,
/// Name of the exchange to bind to.
///
/// A client MUST NOT be allowed to bind a queue to a non-existent
/// exchange. Error code: not-found
///
/// The server MUST accept a blank exchange name to mean the
/// default exchange.
pub exchange: String,
/// Specifies the routing key for the binding. The routing key is
/// used for routing messages depending on the exchange
/// configuration. Not all exchanges use a routing key - refer to
/// the specific exchange documentation. If the queue name is
/// empty, the server uses the last queue declared on the channel.
/// If the routing key is also empty, the server uses this queue
/// name for the routing key as well. If the queue name is
/// provided but the routing key is empty, the server does the
/// binding with that empty routing key. The meaning of empty
/// routing keys depends on the exchange implementation.
///
/// If a message queue binds to a direct exchange using routing
/// key K and a publisher sends the exchange a message with
/// routing key R, then the message MUST be passed to the message
/// queue if K = R.
pub routing_key: Option<String>,
/// If set, the server will not respond to the method. The client
/// should not wait for a reply method. If the server could not
/// complete the method it will raise a channel or connection
/// exception.
pub no_wait: bool,
/// A set of arguments for the binding. The syntax and semantics
/// of these arguments depends on the exchange class.
pub arguments: Option<amqp::Table>,
}
pub enum ExchangeType {
Topic,
Headers,
@ -78,7 +126,7 @@ pub struct ExchangeConfig {
/// The exchange name consists of a non-empty sequence of these
/// characters: letters, digits, hyphen, underscore, period, or
/// colon. Error code: precondition-failed
exchange: String,
pub exchange: String,
/// Each exchange belongs to one of a set of exchange types
/// implemented by the server. The exchange types define the
@ -93,7 +141,7 @@ pub struct ExchangeConfig {
///
/// The client MUST NOT attempt to declare an exchange with a type
/// that the server does not support. Error code: command-invalid
exchange_type: ExchangeType,
pub exchange_type: ExchangeType,
/// If set, the server will reply with Declare-Ok if the exchange
/// already exists with the same name, and raise an error if not.
@ -112,7 +160,7 @@ pub struct ExchangeConfig {
/// and arguments fields. The server MUST respond with Declare-Ok
/// if the requested exchange matches these fields, and MUST raise
/// a channel exception if not.
passive: bool,
pub passive: bool,
/// If set when creating a new exchange, the exchange will be
/// marked as durable. Durable exchanges remain active when a
@ -120,7 +168,7 @@ pub struct ExchangeConfig {
/// are purged if/when a server restarts.
///
/// The server MUST support both durable and transient exchanges.
durable: bool,
pub durable: bool,
/// If set, the exchange is deleted when all queues have finished
/// using it.
@ -134,23 +182,23 @@ pub struct ExchangeConfig {
///
/// The server MUST ignore the auto-delete field if the exchange
/// already exists.
auto_delete: bool,
pub auto_delete: bool,
/// If set, the exchange may not be used directly by publishers,
/// but only when bound to other exchanges. Internal exchanges are
/// used to construct wiring that is not visible to applications.
internal: bool,
pub internal: bool,
/// If set, the server will not respond to the method. The client
/// should not wait for a reply method. If the server could not
/// complete the method it will raise a channel or connection
/// exception.
nowait: bool,
pub no_wait: bool,
/// A set of arguments for the declaration. The syntax and
/// semantics of these arguments depends on the server
/// implementation.
arguments: Option<amqp::Table>,
pub arguments: Option<amqp::Table>,
}
pub fn session_from_config(config: &RabbitMQConfig) -> Result<amqp::Session, amqp::AMQPError> {
@ -191,12 +239,15 @@ pub trait TypedWrappers {
where
T: amqp::Consumer + 'static;
fn declare_exchange<T>(
fn declare_exchange(
&mut self,
config: ExchangeConfig,
) -> Result<amqp::protocol::exchange::DeclareOk, amqp::AMQPError>
where
T: amqp::Consumer + 'static;
) -> Result<amqp::protocol::exchange::DeclareOk, amqp::AMQPError>;
fn bind_queue(
&mut self,
config: BindQueueConfig,
) -> Result<amqp::protocol::queue::BindOk, amqp::AMQPError>;
}
impl TypedWrappers for amqp::Channel {
@ -216,12 +267,10 @@ impl TypedWrappers for amqp::Channel {
)
}
fn declare_exchange<T>(
fn declare_exchange(
&mut self,
config: ExchangeConfig,
) -> Result<amqp::protocol::exchange::DeclareOk, amqp::AMQPError>
where
T: amqp::Consumer + 'static,
{
self.exchange_declare(
config.exchange,
@ -230,7 +279,21 @@ impl TypedWrappers for amqp::Channel {
config.durable,
config.auto_delete,
config.internal,
config.nowait,
config.no_wait,
config.arguments.unwrap_or(amqp::Table::new()),
)
}
fn bind_queue(
&mut self,
config: BindQueueConfig,
) -> Result<amqp::protocol::queue::BindOk, amqp::AMQPError>
{
self.queue_bind(
config.queue,
config.exchange,
config.routing_key.unwrap_or("".to_owned()),
config.no_wait,
config.arguments.unwrap_or(amqp::Table::new()),
)
}