Add a worker that can return things during run time

This commit is contained in:
Graham Christensen 2018-01-11 19:45:22 -05:00
parent e4db4e8a2f
commit bd5fa9bcc0
No known key found for this signature in database
GPG key ID: ACA1C1D120C83D5C
2 changed files with 93 additions and 0 deletions

View file

@ -31,6 +31,7 @@ pub mod commitstatus;
pub mod outpathdiff;
pub mod tagger;
pub mod asynccmd;
pub mod notifyworker;
pub mod ofborg {
pub use asynccmd;

View file

@ -0,0 +1,92 @@
use amqp::Basic;
use amqp::{Consumer, Channel};
use amqp::protocol::basic::{Deliver,BasicProperties};
use std::marker::Send;
use worker::Action;
pub struct NotifyWorker<T: SimpleNotifyWorker> {
internal: T,
}
pub trait SimpleNotifyWorker {
type J;
type N;
fn consumer(&self, job: &Self::J, notifier: &Self::N);
fn notifier(&self, channel: &NotificationReceiver) -> &Self::N;
fn msg_to_job(&self, method: &Deliver, headers: &BasicProperties,
body: &Vec<u8>) -> Result<Self::J, String>;
}
pub trait NotificationReceiver {
fn tell(&mut self, action: Action);
}
pub struct ChannelNotificationReceiver<'a> {
channel: &'a mut Channel,
delivery_tag: u64,
}
impl<'a> ChannelNotificationReceiver<'a> {
fn new(channel: &'a mut Channel, delivery_tag: u64) -> ChannelNotificationReceiver<'a> {
return ChannelNotificationReceiver{
channel: channel,
delivery_tag: delivery_tag,
};
}
}
impl<'a> NotificationReceiver for ChannelNotificationReceiver<'a> {
fn tell(&mut self, action: Action) {
match action {
Action::Ack => {
self.channel.basic_ack(self.delivery_tag, false).unwrap();
}
Action::NackRequeue => {
self.channel.basic_nack(self.delivery_tag, false, true).unwrap();
}
Action::NackDump => {
self.channel.basic_nack(self.delivery_tag, false, false).unwrap();
}
Action::Publish(msg) => {
let exch = msg.exchange.clone().unwrap_or("".to_owned());
let key = msg.routing_key.clone().unwrap_or("".to_owned());
let props = msg.properties.unwrap_or(BasicProperties{ ..Default::default()});
self.channel.basic_publish(
exch,
key,
msg.mandatory,
msg.immediate,
props,
msg.content
).unwrap();
}
}
}
}
pub fn new<T: SimpleNotifyWorker>(worker: T) -> NotifyWorker<T> {
return NotifyWorker{
internal: worker,
};
}
impl <T: SimpleNotifyWorker + Send> Consumer for NotifyWorker<T> {
fn handle_delivery(&mut self,
channel: &mut Channel,
method: Deliver,
headers: BasicProperties,
body: Vec<u8>) {
let receiver = ChannelNotificationReceiver::new(
channel,
method.delivery_tag
);
let notifier = self.internal.notifier(&receiver);
let job = self.internal.msg_to_job(&method, &headers, &body).unwrap();
self.internal.consumer(&job, &notifier);
}
}