From 634f2bb51fba80c67059b48fe4fc76357e1d8b6a Mon Sep 17 00:00:00 2001 From: Graham Christensen Date: Tue, 30 Jan 2018 22:23:21 -0500 Subject: [PATCH] Add a typed wrapper to exchange declarations --- ofborg/src/easyamqp.rs | 127 +++++++++++++++++++++++++++++++++++++++++ 1 file changed, 127 insertions(+) diff --git a/ofborg/src/easyamqp.rs b/ofborg/src/easyamqp.rs index ac43b75..9ccaf78 100644 --- a/ofborg/src/easyamqp.rs +++ b/ofborg/src/easyamqp.rs @@ -48,6 +48,111 @@ pub struct ConsumeConfig { pub arguments: Option, } +pub enum ExchangeType { + Topic, + Headers, + Fanout, + Direct, + Custom(String), +} + +impl Into for ExchangeType { + fn into(self) -> String { + match self { + ExchangeType::Topic => "topic".to_owned(), + ExchangeType::Headers => "headers".to_owned(), + ExchangeType::Fanout => "fanout".to_owned(), + ExchangeType::Direct => "direct".to_owned(), + ExchangeType::Custom(x) => x, + } + } +} + +pub struct ExchangeConfig { + /// Exchange names starting with "amq." are reserved for + /// pre-declared and standardised exchanges. The client MAY + /// declare an exchange starting with "amq." if the passive option + /// is set, or the exchange already exists. Error code: + /// access-refused + /// + /// The exchange name consists of a non-empty sequence of these + /// characters: letters, digits, hyphen, underscore, period, or + /// colon. Error code: precondition-failed + exchange: String, + + /// Each exchange belongs to one of a set of exchange types + /// implemented by the server. The exchange types define the + /// functionality of the exchange - i.e. how messages are routed + /// through it. It is not valid or meaningful to attempt to change + /// the type of an existing exchange. + /// + /// Exchanges cannot be redeclared with different types. The + /// client MUST not attempt to redeclare an existing exchange with + /// a different type than used in the original Exchange.Declare + /// method. Error code: not-allowed + /// + /// The client MUST NOT attempt to declare an exchange with a type + /// that the server does not support. Error code: command-invalid + exchange_type: ExchangeType, + + /// If set, the server will reply with Declare-Ok if the exchange + /// already exists with the same name, and raise an error if not. + /// The client can use this to check whether an exchange exists + /// without modifying the server state. When set, all other method + /// fields except name and no-wait are ignored. A declare with + /// both passive and no-wait has no effect. Arguments are compared + /// for semantic equivalence. + /// + /// If set, and the exchange does not already exist, the server + /// MUST raise a channel exception with reply code 404 (not + /// found). + /// + /// If not set and the exchange exists, the server MUST check that + /// the existing exchange has the same values for type, durable, + /// and arguments fields. The server MUST respond with Declare-Ok + /// if the requested exchange matches these fields, and MUST raise + /// a channel exception if not. + passive: bool, + + /// If set when creating a new exchange, the exchange will be + /// marked as durable. Durable exchanges remain active when a + /// server restarts. Non-durable exchanges (transient exchanges) + /// are purged if/when a server restarts. + /// + /// The server MUST support both durable and transient exchanges. + durable: bool, + + /// If set, the exchange is deleted when all queues have finished + /// using it. + /// + /// The server SHOULD allow for a reasonable delay between the + /// point when it determines that an exchange is not being used + /// (or no longer used), and the point when it deletes the + /// exchange. At the least it must allow a client to create an + /// exchange and then bind a queue to it, with a small but + /// non-zero delay between these two actions. + /// + /// The server MUST ignore the auto-delete field if the exchange + /// already exists. + auto_delete: bool, + + /// If set, the exchange may not be used directly by publishers, + /// but only when bound to other exchanges. Internal exchanges are + /// used to construct wiring that is not visible to applications. + internal: bool, + + /// If set, the server will not respond to the method. The client + /// should not wait for a reply method. If the server could not + /// complete the method it will raise a channel or connection + /// exception. + nowait: bool, + + /// A set of arguments for the declaration. The syntax and + /// semantics of these arguments depends on the server + /// implementation. + arguments: Option, +} + pub fn session_from_config(config: &RabbitMQConfig) -> Result { let scheme = if config.ssl { @@ -79,6 +184,11 @@ pub trait TypedWrappers { fn consume(&mut self, callback: T, config: ConsumeConfig) -> Result where T: amqp::Consumer + 'static; + + fn declare_exchange(&mut self, config: ExchangeConfig) + -> Result + where T: amqp::Consumer + 'static; + } impl TypedWrappers for amqp::Channel { @@ -97,4 +207,21 @@ impl TypedWrappers for amqp::Channel { config.arguments.unwrap_or(amqp::Table::new()), ) } + + fn declare_exchange(&mut self, config: ExchangeConfig) + -> Result + where T: amqp::Consumer + 'static + { + self.exchange_declare( + config.exchange, + config.exchange_type.into(), + config.passive, + config.durable, + config.auto_delete, + config.internal, + config.nowait, + config.arguments.unwrap_or(amqp::Table::new()) + ) + } + }