Add publish support, and nack is now nackdump / nackrequeue
This commit is contained in:
parent
3d0f728d92
commit
26288b459e
|
@ -37,14 +37,14 @@ impl Actions {
|
||||||
|
|
||||||
return vec![
|
return vec![
|
||||||
worker::Action::Publish(worker::QueueMsg{
|
worker::Action::Publish(worker::QueueMsg{
|
||||||
exchange: Some("build-results".to_owned()),
|
exchange: Some("build-results-x".to_owned()),
|
||||||
routing_key: None,
|
routing_key: None,
|
||||||
mandatory: true,
|
mandatory: true,
|
||||||
immediate: true,
|
immediate: true,
|
||||||
properties: Some(props),
|
properties: Some(props),
|
||||||
content: serde_json::to_string(&msg).unwrap().into_bytes()
|
content: serde_json::to_string(&msg).unwrap().into_bytes()
|
||||||
}),
|
}),
|
||||||
worker::Action::Ack
|
worker::Action::NackRequeue
|
||||||
];
|
];
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,4 +1,4 @@
|
||||||
|
use amqp::Basic;
|
||||||
use amqp::{Consumer, Channel};
|
use amqp::{Consumer, Channel};
|
||||||
use amqp::protocol::basic::{Deliver,BasicProperties};
|
use amqp::protocol::basic::{Deliver,BasicProperties};
|
||||||
use std::marker::Send;
|
use std::marker::Send;
|
||||||
|
@ -15,7 +15,8 @@ pub type Actions = Vec<Action>;
|
||||||
|
|
||||||
pub enum Action {
|
pub enum Action {
|
||||||
Ack,
|
Ack,
|
||||||
Nack,
|
NackRequeue,
|
||||||
|
NackDump,
|
||||||
Publish(QueueMsg),
|
Publish(QueueMsg),
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -48,7 +49,7 @@ pub fn new<T: SimpleWorker>(worker: T) -> Worker<T> {
|
||||||
|
|
||||||
impl <T: SimpleWorker + Send> Consumer for Worker<T> {
|
impl <T: SimpleWorker + Send> Consumer for Worker<T> {
|
||||||
fn handle_delivery(&mut self,
|
fn handle_delivery(&mut self,
|
||||||
_: &mut Channel,
|
channel: &mut Channel,
|
||||||
method: Deliver,
|
method: Deliver,
|
||||||
headers: BasicProperties,
|
headers: BasicProperties,
|
||||||
body: Vec<u8>) {
|
body: Vec<u8>) {
|
||||||
|
@ -56,9 +57,26 @@ impl <T: SimpleWorker + Send> Consumer for Worker<T> {
|
||||||
let job = self.internal.msg_to_job(&method, &headers, &body).unwrap();
|
let job = self.internal.msg_to_job(&method, &headers, &body).unwrap();
|
||||||
for action in self.internal.consumer(&job) {
|
for action in self.internal.consumer(&job) {
|
||||||
match action {
|
match action {
|
||||||
Action::Ack => {}
|
Action::Ack => {
|
||||||
Action::Nack => {}
|
channel.basic_ack(method.delivery_tag, false).unwrap();
|
||||||
Action::Publish(_) => {}
|
}
|
||||||
|
Action::NackRequeue => {
|
||||||
|
channel.basic_nack(method.delivery_tag, false, true).unwrap();
|
||||||
|
}
|
||||||
|
Action::NackDump => {
|
||||||
|
channel.basic_nack(method.delivery_tag, false, false).unwrap();
|
||||||
|
}
|
||||||
|
Action::Publish(msg) => {
|
||||||
|
let props = msg.properties.unwrap_or(BasicProperties{ ..Default::default()});
|
||||||
|
channel.basic_publish(
|
||||||
|
msg.exchange.unwrap_or("".to_owned()),
|
||||||
|
msg.routing_key.unwrap_or("".to_owned()),
|
||||||
|
msg.mandatory,
|
||||||
|
msg.immediate,
|
||||||
|
props,
|
||||||
|
msg.content
|
||||||
|
).unwrap();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in a new issue