diff --git a/ofborg/src/lib.rs b/ofborg/src/lib.rs index 3295e48..e514bea 100644 --- a/ofborg/src/lib.rs +++ b/ofborg/src/lib.rs @@ -31,6 +31,7 @@ pub mod commitstatus; pub mod outpathdiff; pub mod tagger; pub mod asynccmd; +pub mod notifyworker; pub mod ofborg { pub use asynccmd; diff --git a/ofborg/src/notifyworker.rs b/ofborg/src/notifyworker.rs new file mode 100644 index 0000000..7d14aad --- /dev/null +++ b/ofborg/src/notifyworker.rs @@ -0,0 +1,92 @@ +use amqp::Basic; +use amqp::{Consumer, Channel}; +use amqp::protocol::basic::{Deliver,BasicProperties}; +use std::marker::Send; +use worker::Action; + +pub struct NotifyWorker { + internal: T, +} + +pub trait SimpleNotifyWorker { + type J; + type N; + + fn consumer(&self, job: &Self::J, notifier: &Self::N); + + fn notifier(&self, channel: &NotificationReceiver) -> &Self::N; + + fn msg_to_job(&self, method: &Deliver, headers: &BasicProperties, + body: &Vec) -> Result; +} + +pub trait NotificationReceiver { + fn tell(&mut self, action: Action); +} + +pub struct ChannelNotificationReceiver<'a> { + channel: &'a mut Channel, + delivery_tag: u64, +} + +impl<'a> ChannelNotificationReceiver<'a> { + fn new(channel: &'a mut Channel, delivery_tag: u64) -> ChannelNotificationReceiver<'a> { + return ChannelNotificationReceiver{ + channel: channel, + delivery_tag: delivery_tag, + }; + } +} + +impl<'a> NotificationReceiver for ChannelNotificationReceiver<'a> { + fn tell(&mut self, action: Action) { + match action { + Action::Ack => { + self.channel.basic_ack(self.delivery_tag, false).unwrap(); + } + Action::NackRequeue => { + self.channel.basic_nack(self.delivery_tag, false, true).unwrap(); + } + Action::NackDump => { + self.channel.basic_nack(self.delivery_tag, false, false).unwrap(); + } + Action::Publish(msg) => { + let exch = msg.exchange.clone().unwrap_or("".to_owned()); + let key = msg.routing_key.clone().unwrap_or("".to_owned()); + + let props = msg.properties.unwrap_or(BasicProperties{ ..Default::default()}); + self.channel.basic_publish( + exch, + key, + msg.mandatory, + msg.immediate, + props, + msg.content + ).unwrap(); + } + } + } +} + +pub fn new(worker: T) -> NotifyWorker { + return NotifyWorker{ + internal: worker, + }; +} + +impl Consumer for NotifyWorker { + fn handle_delivery(&mut self, + channel: &mut Channel, + method: Deliver, + headers: BasicProperties, + body: Vec) { + let receiver = ChannelNotificationReceiver::new( + channel, + method.delivery_tag + ); + let notifier = self.internal.notifier(&receiver); + + let job = self.internal.msg_to_job(&method, &headers, &body).unwrap(); + self.internal.consumer(&job, ¬ifier); + } +}