diff --git a/ofborg/src/bin/mass-rebuilder.rs b/ofborg/src/bin/mass-rebuilder.rs index 8db5be9..b449468 100644 --- a/ofborg/src/bin/mass-rebuilder.rs +++ b/ofborg/src/bin/mass-rebuilder.rs @@ -40,7 +40,7 @@ fn main() { let cloner = checkout::cached_cloner(Path::new(&cfg.checkout.root)); let nix = cfg.nix(); - let events = stats::RabbitMQ::new( + let events = stats::RabbitMQ::from_amqp( &format!("{}-{}", cfg.runner.identity.clone(), cfg.nix.system.clone()), session.open_channel(3).unwrap(), ); diff --git a/ofborg/src/bin/stats.rs b/ofborg/src/bin/stats.rs index 06a4ffc..bcf3dd1 100644 --- a/ofborg/src/bin/stats.rs +++ b/ofborg/src/bin/stats.rs @@ -17,7 +17,7 @@ fn main() { let mut session = easyamqp::session_from_config(&cfg.rabbitmq).unwrap(); info!("Connected to rabbitmq"); - let events = stats::RabbitMQ::new( + let events = stats::RabbitMQ::from_amqp( &format!("{}-{}", cfg.runner.identity.clone(), cfg.nix.system.clone()), session.open_channel(3).unwrap(), ); diff --git a/ofborg/src/stats.rs b/ofborg/src/stats.rs index c9bde24..97bccef 100644 --- a/ofborg/src/stats.rs +++ b/ofborg/src/stats.rs @@ -1,5 +1,8 @@ -use amqp::protocol::basic::BasicProperties; -use amqp::{Basic, Channel}; +use amqp::protocol::basic; +use amqp::Basic; +use async_std::task; +use lapin::options::BasicPublishOptions; +use lapin::CloseOnDrop; include!(concat!(env!("OUT_DIR"), "/events.rs")); @@ -19,13 +22,13 @@ pub struct EventMessage { pub events: Vec, } -pub struct RabbitMQ { +pub struct RabbitMQ { identity: String, - channel: Channel, + channel: C, } -impl RabbitMQ { - pub fn new(identity: &str, channel: Channel) -> RabbitMQ { +impl RabbitMQ { + pub fn from_amqp(identity: &str, channel: amqp::Channel) -> Self { RabbitMQ { identity: identity.to_owned(), channel, @@ -33,9 +36,9 @@ impl RabbitMQ { } } -impl SysEvents for RabbitMQ { +impl SysEvents for RabbitMQ { fn notify(&mut self, event: Event) { - let props = BasicProperties { + let props = basic::BasicProperties { ..Default::default() }; self.channel @@ -55,3 +58,38 @@ impl SysEvents for RabbitMQ { .unwrap(); } } + +impl RabbitMQ> { + pub fn from_lapin(identity: &str, channel: CloseOnDrop) -> Self { + RabbitMQ { + identity: identity.to_owned(), + channel, + } + } +} + +impl SysEvents for RabbitMQ> { + fn notify(&mut self, event: Event) { + let props = lapin::BasicProperties::default().with_content_type("application/json".into()); + task::block_on(async { + let _confirmaton = self + .channel + .basic_publish( + &String::from("stats"), + &"".to_owned(), + BasicPublishOptions::default(), + serde_json::to_string(&EventMessage { + sender: self.identity.clone(), + events: vec![event], + }) + .unwrap() + .into_bytes(), + props, + ) + .await + .unwrap() + .await + .unwrap(); + }); + } +}