Add a typed wrapper for declaring a queue

This commit is contained in:
Graham Christensen 2018-01-31 19:19:40 -05:00
parent 23cb108a45
commit 3086bdc0e2
No known key found for this signature in database
GPG key ID: ACA1C1D120C83D5C

View file

@ -201,6 +201,88 @@ pub struct ExchangeConfig {
pub arguments: Option<amqp::Table>,
}
pub struct QueueConfig {
/// The queue name MAY be empty, in which case the server MUST
/// create a new queue with a unique generated name and return
/// this to the client in the Declare-Ok method.
///
/// Queue names starting with "amq." are reserved for pre-declared
/// and standardised queues. The client MAY declare a queue
/// starting with "amq." if the passive option is set, or the
/// queue already exists. Error code: access-refused
///
/// The queue name can be empty, or a sequence of these
/// characters: letters, digits, hyphen, underscore, period, or
/// colon. Error code: precondition-failed
pub queue: String,
/// If set, the server will reply with Declare-Ok if the queue
/// already exists with the same name, and raise an error if not.
/// The client can use this to check whether a queue exists
/// without modifying the server state. When set, all other
/// method fields except name and no-wait are ignored. A declare
/// with both passive and no-wait has no effect. Arguments are
/// compared for semantic equivalence.
///
/// The client MAY ask the server to assert that a queue exists
/// without creating the queue if not. If the queue does not
/// exist, the server treats this as a failure. Error code:
/// not-found
///
/// If not set and the queue exists, the server MUST check that
/// the existing queue has the same values for durable, exclusive,
/// auto-delete, and arguments fields. The server MUST respond
/// with Declare-Ok if the requested queue matches these fields,
/// and MUST raise a channel exception if not.
pub passive: bool,
/// If set when creating a new queue, the queue will be marked as
/// durable. Durable queues remain active when a server restarts.
/// Non-durable queues (transient queues) are purged if/when a
/// server restarts. Note that durable queues do not necessarily
/// hold persistent messages, although it does not make sense to
/// send persistent messages to a transient queue.
///
/// The server MUST recreate the durable queue after a restart.
///
/// The server MUST support both durable and transient queues.
pub durable: bool,
/// Exclusive queues may only be accessed by the current
/// connection, and are deleted when that connection closes.
/// Passive declaration of an exclusive queue by other connections
/// are not allowed.
///
/// The server MUST support both exclusive (private) and
/// non-exclusive (shared) queues.
/// The client MAY NOT attempt to use a queue that was declared as
/// exclusive by another still-open connection. Error code:
/// resource-locked
pub exclusive: bool,
/// If set, the queue is deleted when all consumers have finished
/// using it. The last consumer can be cancelled either explicitly
/// or because its channel is closed. If there was no consumer
/// ever on the queue, it won't be deleted. Applications can
/// explicitly delete auto-delete queues using the Delete method
/// as normal.
///
/// The server MUST ignore the auto-delete field if the queue
/// already exists.
pub auto_delete: bool,
/// If set, the server will not respond to the method. The client
/// should not wait for a reply method. If the server could not
/// complete the method it will raise a channel or connection
/// exception.
pub no_wait: bool,
/// A set of arguments for the declaration. The syntax and
/// semantics of these arguments depends on the server
/// implementation.
pub arguments: Option<amqp::Table>,
}
pub fn session_from_config(config: &RabbitMQConfig) -> Result<amqp::Session, amqp::AMQPError> {
let scheme = if config.ssl {
amqp::AMQPScheme::AMQPS
@ -244,6 +326,11 @@ pub trait TypedWrappers {
config: ExchangeConfig,
) -> Result<amqp::protocol::exchange::DeclareOk, amqp::AMQPError>;
fn declare_queue(
&mut self,
config: QueueConfig,
) -> Result<amqp::protocol::queue::DeclareOk, amqp::AMQPError>;
fn bind_queue(
&mut self,
config: BindQueueConfig,
@ -284,6 +371,23 @@ impl TypedWrappers for amqp::Channel {
)
}
fn declare_queue(
&mut self,
config: QueueConfig,
) -> Result<amqp::protocol::queue::DeclareOk, amqp::AMQPError>
{
self.queue_declare(
config.queue,
config.passive,
config.durable,
config.exclusive,
config.auto_delete,
config.no_wait,
config.arguments.unwrap_or(amqp::Table::new()),
)
}
fn bind_queue(
&mut self,
config: BindQueueConfig,