diff --git a/ofborg/src/message/buildjob.rs b/ofborg/src/message/buildjob.rs index 2643c8c..8e5ccd0 100644 --- a/ofborg/src/message/buildjob.rs +++ b/ofborg/src/message/buildjob.rs @@ -37,14 +37,14 @@ impl Actions { return vec![ worker::Action::Publish(worker::QueueMsg{ - exchange: Some("build-results".to_owned()), + exchange: Some("build-results-x".to_owned()), routing_key: None, mandatory: true, immediate: true, properties: Some(props), content: serde_json::to_string(&msg).unwrap().into_bytes() }), - worker::Action::Ack + worker::Action::NackRequeue ]; } } diff --git a/ofborg/src/worker.rs b/ofborg/src/worker.rs index 0ade66e..f846d0a 100644 --- a/ofborg/src/worker.rs +++ b/ofborg/src/worker.rs @@ -1,4 +1,4 @@ - +use amqp::Basic; use amqp::{Consumer, Channel}; use amqp::protocol::basic::{Deliver,BasicProperties}; use std::marker::Send; @@ -15,7 +15,8 @@ pub type Actions = Vec; pub enum Action { Ack, - Nack, + NackRequeue, + NackDump, Publish(QueueMsg), } @@ -48,7 +49,7 @@ pub fn new(worker: T) -> Worker { impl Consumer for Worker { fn handle_delivery(&mut self, - _: &mut Channel, + channel: &mut Channel, method: Deliver, headers: BasicProperties, body: Vec) { @@ -56,9 +57,26 @@ impl Consumer for Worker { let job = self.internal.msg_to_job(&method, &headers, &body).unwrap(); for action in self.internal.consumer(&job) { match action { - Action::Ack => {} - Action::Nack => {} - Action::Publish(_) => {} + Action::Ack => { + channel.basic_ack(method.delivery_tag, false).unwrap(); + } + Action::NackRequeue => { + channel.basic_nack(method.delivery_tag, false, true).unwrap(); + } + Action::NackDump => { + channel.basic_nack(method.delivery_tag, false, false).unwrap(); + } + Action::Publish(msg) => { + let props = msg.properties.unwrap_or(BasicProperties{ ..Default::default()}); + channel.basic_publish( + msg.exchange.unwrap_or("".to_owned()), + msg.routing_key.unwrap_or("".to_owned()), + msg.mandatory, + msg.immediate, + props, + msg.content + ).unwrap(); + } } } }