Begin accepting build results on the log queues

This commit is contained in:
Graham Christensen 2018-03-10 18:32:23 -05:00
parent a796200325
commit 4f9258c3b6
No known key found for this signature in database
GPG key ID: ACA1C1D120C83D5C
2 changed files with 27 additions and 3 deletions

View file

@ -189,12 +189,20 @@ impl<'a, 'b> JobActions<'a, 'b> {
let result_exchange = self.result_exchange.clone();
let result_routing_key = self.result_routing_key.clone();
self.tell(worker::publish_serde_action(
result_exchange,
result_routing_key,
&msg,
));
let log_exchange = self.log_exchange.clone();
let log_routing_key = self.log_routing_key.clone();
self.tell(worker::publish_serde_action(
log_exchange,
log_routing_key,
&msg,
));
self.tell(worker::Action::Ack);
}
@ -216,12 +224,20 @@ impl<'a, 'b> JobActions<'a, 'b> {
let result_exchange = self.result_exchange.clone();
let result_routing_key = self.result_routing_key.clone();
self.tell(worker::publish_serde_action(
result_exchange,
result_routing_key,
&msg,
));
let log_exchange = self.log_exchange.clone();
let log_routing_key = self.log_routing_key.clone();
self.tell(worker::publish_serde_action(
log_exchange,
log_routing_key,
&msg,
));
self.tell(worker::Action::Ack);
}

View file

@ -10,6 +10,7 @@ use std::io::Write;
use ofborg::writetoline::LineWriter;
use ofborg::message::buildlogmsg::{BuildLogStart, BuildLogMsg};
use ofborg::message::buildresult::BuildResult;
use ofborg::worker;
use amqp::protocol::basic::{Deliver, BasicProperties};
@ -28,6 +29,7 @@ pub struct LogMessageCollector {
enum MsgType {
Start(BuildLogStart),
Msg(BuildLogMsg),
Finish(BuildResult),
}
#[derive(Debug)]
@ -175,10 +177,16 @@ impl worker::SimpleWorker for LogMessageCollector {
if let Ok(msg) = decode_msg {
attempt_id = msg.attempt_id.clone();
message = MsgType::Start(msg);
} else {
let decode_msg: Result<BuildResult, _> = serde_json::from_slice(body);
if let Ok(msg) = decode_msg {
attempt_id = msg.attempt_id.clone();
message = MsgType::Finish(msg);
} else {
return Err(format!("failed to decode job: {:?}", decode_msg));
}
}
}
return Ok(LogMessage {
from: LogFrom {