Move heartbeats to a task / message

This commit is contained in:
Graham Christensen 2017-11-10 19:22:43 -05:00
parent 76c1a342b9
commit 72018565dd
No known key found for this signature in database
GPG key ID: ACA1C1D120C83D5C
7 changed files with 125 additions and 62 deletions

View file

@ -22,6 +22,7 @@ use ofborg::config;
use ofborg::checkout;
use ofborg::worker;
use ofborg::message::buildjob;
use ofborg::tasks;
use ofborg::nix;
@ -36,69 +37,10 @@ fn main() {
{
println!("About to open channel #1");
let mut hbchan = session.open_channel(1).unwrap();
println!("Opened channel #1");
//queue: &str, passive, durable, exclusive, auto_delete, nowait, arguments: Table
match hbchan.queue_declare("", false, false, true, true, false, Table::new()) {
Err(problem) => {
println!("Failed to declare a queue: {:?}", problem);
process::exit(1);
}
Ok(resp) => {
println!("Got personal queue: {:?}", resp);
let queueName = resp.queue;
hbchan.basic_publish(
"",
queueName.as_ref(),
true,
false,
BasicProperties {
content_type: Some("text".to_owned()),
..Default::default()
},
(b"Hello from rust!").to_vec()
).unwrap();
let qnameC = queueName.clone();
thread::spawn(move || {
hbchan.basic_consume(
move |chan: &mut Channel, deliver: Deliver, headers: BasicProperties, data: Vec<u8>|
{
chan.basic_publish(
"",
qnameC.as_ref(),
true,
false,
BasicProperties {
content_type: Some("text".to_owned()),
..Default::default()
},
(b"Hello from rust!").to_vec()
).unwrap();
let ten_sec = time::Duration::from_secs(1);
thread::sleep(ten_sec);
println!("Got a plastic heartbeat <3");
chan.basic_ack(deliver.delivery_tag, false).unwrap();
},
queueName,
String::from("hbchanner1"),
false,
false,
false,
false,
Table::new()
);
hbchan.start_consuming();
println!("Erm... in the heartbaets");
process::exit(1);
});
}
}
tasks::heartbeat::start_on_channel(hbchan, cfg.whoami());
}
let mut channel = session.open_channel(2).unwrap();

View file

@ -14,6 +14,7 @@ pub mod clone;
pub mod worker;
pub mod config;
pub mod message;
pub mod tasks;
pub mod nix;
pub mod ofborg {
@ -23,5 +24,6 @@ pub mod ofborg {
pub use clone;
pub use worker;
pub use message;
pub use tasks;
pub use nix;
}

View file

@ -2,7 +2,7 @@ use ofborg::message::{Pr,Repo};
use ofborg::message::buildresult;
use ofborg::worker;
use serde_json;
use amqp::{Channel, protocol};
use amqp::protocol;
#[derive(Serialize, Deserialize, Debug)]
pub struct BuildJob {

View file

@ -1,5 +1,6 @@
mod common;
pub mod buildjob;
pub mod buildresult;
pub mod plasticheartbeat;
use self::common::{Pr,Repo};

View file

@ -0,0 +1,12 @@
extern crate amqp;
extern crate env_logger;
use serde_json;
#[derive(Serialize, Deserialize, Debug)]
pub struct PlasticHeartbeat {
}
pub fn from(data: &Vec<u8>) -> Result<PlasticHeartbeat, serde_json::error::Error> {
return serde_json::from_slice(&data);
}

View file

@ -0,0 +1,104 @@
use std::{thread, time};
use serde_json;
use ofborg::worker;
use ofborg::message::plasticheartbeat;
use amqp::Channel;
use amqp::Table;
use amqp::protocol::basic::{Deliver,BasicProperties};
use std::process;
use amqp::Basic;
struct PlasticHeartbeatWorker {
queue_name: String
}
impl PlasticHeartbeatWorker {
fn message(&self) -> worker::QueueMsg {
return worker::QueueMsg{
exchange: None,
routing_key: Some(self.queue_name.clone()),
mandatory: true,
immediate: false,
properties: None,
content: serde_json::to_string(&plasticheartbeat::PlasticHeartbeat{}).unwrap().into_bytes()
};
}
}
impl worker::SimpleWorker for PlasticHeartbeatWorker {
type J = plasticheartbeat::PlasticHeartbeat;
fn msg_to_job(&self, _: &Deliver, _: &BasicProperties,
body: &Vec<u8>) -> Result<Self::J, String> {
return match plasticheartbeat::from(body) {
Ok(e) => { Ok(e) }
Err(e) => {
println!("{:?}", String::from_utf8(body.clone()));
panic!("{:?}", e);
}
}
}
fn consumer(&self, _job: &plasticheartbeat::PlasticHeartbeat) -> worker::Actions {
thread::sleep(time::Duration::from_secs(10));
return vec![
worker::Action::Publish(self.message()),
worker::Action::NackRequeue
];
}
}
pub fn start_on_channel(mut hbchan: Channel, consumer_name: String) {
let queue_name = hbchan.queue_declare(
"",
false, // passive
false, // durable
true, // exclusive
true, // auto_delete
false, //nowait
Table::new()
)
.expect("Failed to declare an anon queue for PlasticHeartbeats!")
.queue;
println!("Got personal queue: {:?}", queue_name);
hbchan.basic_publish(
"",
queue_name.as_ref(),
true, // mandatory
false, // immediate
BasicProperties {
..Default::default()
},
serde_json::to_string(&plasticheartbeat::PlasticHeartbeat{}).unwrap().into_bytes()
).unwrap();
let worker = move ||
{
hbchan.basic_consume(
worker::new(
PlasticHeartbeatWorker{
queue_name: (&queue_name).clone()
}
),
queue_name,
String::from(consumer_name),
false,
false,
false,
false,
Table::new()
).unwrap();
hbchan.start_consuming();
println!("PlasticHeartbeat failed");
process::exit(1);
};
thread::spawn(worker);
}

2
ofborg/src/tasks/mod.rs Normal file
View file

@ -0,0 +1,2 @@
pub mod heartbeat;