diff --git a/ofborg/src/bin/builder.rs b/ofborg/src/bin/builder.rs index 93dd555..e4ff311 100644 --- a/ofborg/src/bin/builder.rs +++ b/ofborg/src/bin/builder.rs @@ -22,6 +22,7 @@ use ofborg::config; use ofborg::checkout; use ofborg::worker; use ofborg::message::buildjob; +use ofborg::tasks; use ofborg::nix; @@ -36,69 +37,10 @@ fn main() { { println!("About to open channel #1"); let mut hbchan = session.open_channel(1).unwrap(); + println!("Opened channel #1"); - //queue: &str, passive, durable, exclusive, auto_delete, nowait, arguments: Table - match hbchan.queue_declare("", false, false, true, true, false, Table::new()) { - Err(problem) => { - println!("Failed to declare a queue: {:?}", problem); - process::exit(1); - } - Ok(resp) => { - println!("Got personal queue: {:?}", resp); - - let queueName = resp.queue; - hbchan.basic_publish( - "", - queueName.as_ref(), - true, - false, - BasicProperties { - content_type: Some("text".to_owned()), - ..Default::default() - }, - (b"Hello from rust!").to_vec() - ).unwrap(); - - let qnameC = queueName.clone(); - - thread::spawn(move || { - hbchan.basic_consume( - move |chan: &mut Channel, deliver: Deliver, headers: BasicProperties, data: Vec| - { - chan.basic_publish( - "", - qnameC.as_ref(), - true, - false, - BasicProperties { - content_type: Some("text".to_owned()), - ..Default::default() - }, - (b"Hello from rust!").to_vec() - ).unwrap(); - - let ten_sec = time::Duration::from_secs(1); - thread::sleep(ten_sec); - println!("Got a plastic heartbeat <3"); - chan.basic_ack(deliver.delivery_tag, false).unwrap(); - - }, - queueName, - String::from("hbchanner1"), - false, - false, - false, - false, - Table::new() - ); - - hbchan.start_consuming(); - println!("Erm... in the heartbaets"); - process::exit(1); - }); - } - } + tasks::heartbeat::start_on_channel(hbchan, cfg.whoami()); } let mut channel = session.open_channel(2).unwrap(); diff --git a/ofborg/src/lib.rs b/ofborg/src/lib.rs index 90788e1..b114607 100644 --- a/ofborg/src/lib.rs +++ b/ofborg/src/lib.rs @@ -14,6 +14,7 @@ pub mod clone; pub mod worker; pub mod config; pub mod message; +pub mod tasks; pub mod nix; pub mod ofborg { @@ -23,5 +24,6 @@ pub mod ofborg { pub use clone; pub use worker; pub use message; + pub use tasks; pub use nix; } diff --git a/ofborg/src/message/buildjob.rs b/ofborg/src/message/buildjob.rs index 35b83e7..e606312 100644 --- a/ofborg/src/message/buildjob.rs +++ b/ofborg/src/message/buildjob.rs @@ -2,7 +2,7 @@ use ofborg::message::{Pr,Repo}; use ofborg::message::buildresult; use ofborg::worker; use serde_json; -use amqp::{Channel, protocol}; +use amqp::protocol; #[derive(Serialize, Deserialize, Debug)] pub struct BuildJob { diff --git a/ofborg/src/message/mod.rs b/ofborg/src/message/mod.rs index bb305a2..231428a 100644 --- a/ofborg/src/message/mod.rs +++ b/ofborg/src/message/mod.rs @@ -1,5 +1,6 @@ mod common; pub mod buildjob; pub mod buildresult; +pub mod plasticheartbeat; use self::common::{Pr,Repo}; diff --git a/ofborg/src/message/plasticheartbeat.rs b/ofborg/src/message/plasticheartbeat.rs new file mode 100644 index 0000000..704b64a --- /dev/null +++ b/ofborg/src/message/plasticheartbeat.rs @@ -0,0 +1,12 @@ +extern crate amqp; +extern crate env_logger; + +use serde_json; + +#[derive(Serialize, Deserialize, Debug)] +pub struct PlasticHeartbeat { +} + +pub fn from(data: &Vec) -> Result { + return serde_json::from_slice(&data); +} diff --git a/ofborg/src/tasks/heartbeat.rs b/ofborg/src/tasks/heartbeat.rs new file mode 100644 index 0000000..0efccd1 --- /dev/null +++ b/ofborg/src/tasks/heartbeat.rs @@ -0,0 +1,104 @@ + + +use std::{thread, time}; +use serde_json; +use ofborg::worker; +use ofborg::message::plasticheartbeat; +use amqp::Channel; +use amqp::Table; +use amqp::protocol::basic::{Deliver,BasicProperties}; +use std::process; +use amqp::Basic; + +struct PlasticHeartbeatWorker { + queue_name: String +} + +impl PlasticHeartbeatWorker { + fn message(&self) -> worker::QueueMsg { + return worker::QueueMsg{ + exchange: None, + routing_key: Some(self.queue_name.clone()), + mandatory: true, + immediate: false, + properties: None, + content: serde_json::to_string(&plasticheartbeat::PlasticHeartbeat{}).unwrap().into_bytes() + }; + } + +} + +impl worker::SimpleWorker for PlasticHeartbeatWorker { + type J = plasticheartbeat::PlasticHeartbeat; + + fn msg_to_job(&self, _: &Deliver, _: &BasicProperties, + body: &Vec) -> Result { + return match plasticheartbeat::from(body) { + Ok(e) => { Ok(e) } + Err(e) => { + println!("{:?}", String::from_utf8(body.clone())); + panic!("{:?}", e); + } + } + } + + fn consumer(&self, _job: &plasticheartbeat::PlasticHeartbeat) -> worker::Actions { + thread::sleep(time::Duration::from_secs(10)); + + return vec![ + worker::Action::Publish(self.message()), + worker::Action::NackRequeue + ]; + } +} + +pub fn start_on_channel(mut hbchan: Channel, consumer_name: String) { + let queue_name = hbchan.queue_declare( + "", + false, // passive + false, // durable + true, // exclusive + true, // auto_delete + false, //nowait + Table::new() + ) + .expect("Failed to declare an anon queue for PlasticHeartbeats!") + .queue; + + println!("Got personal queue: {:?}", queue_name); + + hbchan.basic_publish( + "", + queue_name.as_ref(), + true, // mandatory + false, // immediate + BasicProperties { + ..Default::default() + }, + serde_json::to_string(&plasticheartbeat::PlasticHeartbeat{}).unwrap().into_bytes() + ).unwrap(); + + let worker = move || + { + hbchan.basic_consume( + worker::new( + PlasticHeartbeatWorker{ + queue_name: (&queue_name).clone() + } + ), + queue_name, + String::from(consumer_name), + false, + false, + false, + false, + Table::new() + ).unwrap(); + + hbchan.start_consuming(); + println!("PlasticHeartbeat failed"); + process::exit(1); + }; + + thread::spawn(worker); +} diff --git a/ofborg/src/tasks/mod.rs b/ofborg/src/tasks/mod.rs new file mode 100644 index 0000000..9640783 --- /dev/null +++ b/ofborg/src/tasks/mod.rs @@ -0,0 +1,2 @@ + +pub mod heartbeat;