diff --git a/ofborg/src/bin/statcheck-worker.rs b/ofborg/src/bin/statcheck-worker.rs new file mode 100644 index 0000000..d6f32df --- /dev/null +++ b/ofborg/src/bin/statcheck-worker.rs @@ -0,0 +1,79 @@ +/// Statuses and checks worker +/// - will keep a database of changes +/// - their statuses +/// - their checks +/// - is VCS/CI agnostic +use std::env; +use std::error::Error; + +use async_std::task; +use ofborg::config; +use ofborg::easyamqp; +use ofborg::easyamqp::ChannelExt; +use ofborg::easyamqp::ConsumerExt; +use ofborg::easylapin; +use ofborg::tasks; +use tracing::info; + +fn main() -> Result<(), Box> { + ofborg::setup_log(); + + let arg = env::args() + .nth(1) + .expect("usage: statcheck-worker "); + let cfg = config::load(arg.as_ref()); + + let conn = easylapin::from_config(&cfg.rabbitmq)?; + let mut chan = task::block_on(conn.create_channel())?; + + // an RPC queue for verbs + let api_queue_name = "statcheck-api".to_owned(); + // an event queue to be notified about statuses & checks changes. + let event_queue_name = "statcheck-events".to_owned(); + + chan.declare_exchange(easyamqp::ExchangeConfig { + exchange: api_queue_name.clone(), + exchange_type: easyamqp::ExchangeType::Topic, + passive: false, + durable: true, + auto_delete: false, + no_wait: false, + internal: false, + })?; + + chan.declare_queue(easyamqp::QueueConfig { + queue: api_queue_name.clone(), + passive: false, + durable: true, + exclusive: false, + auto_delete: false, + no_wait: false, + })?; + + chan.bind_queue(easyamqp::BindQueueConfig { + queue: api_queue_name.clone(), + exchange: api_queue_name.clone(), + routing_key: None, + no_wait: false, + })?; + + let handle = easylapin::WorkerChannel(chan).consume( + tasks::statcheck_collector::StatusCheckCollector::new(cfg.statcheck.clone().db), + easyamqp::ConsumeConfig { + queue: api_queue_name.clone(), + consumer_tag: format!("{}-{}", cfg.whoami(), api_queue_name), + no_local: false, + no_ack: false, + no_wait: false, + exclusive: false, + }, + )?; + + info!("Waiting for API calls on {}", api_queue_name); + info!("Notifying of new changes on {}", event_queue_name); + task::block_on(handle); + + drop(conn); // Close connection. + info!("Closed the session... EOF"); + Ok(()) +} diff --git a/ofborg/src/tasks/status_check_collector.rs b/ofborg/src/tasks/status_check_collector.rs new file mode 100644 index 0000000..b3d48b3 --- /dev/null +++ b/ofborg/src/tasks/status_check_collector.rs @@ -0,0 +1,46 @@ +use std::path::PathBuf; + +use tracing::{debug_span, error}; + +use crate::worker; + +#[derive(Serialize, Deserialize, Debug)] +enum StatusCheckRPCMessage { + ListStatuses, + ListChecks, +} + +#[allow(dead_code)] +struct StatusCheckCollector { + db_path: PathBuf, +} + +// RPC API worker +impl worker::SimpleWorker for StatusCheckCollector { + type J = StatusCheckRPCMessage; + + fn msg_to_job( + &mut self, + _method: &str, + _headers: &Option, + body: &[u8], + ) -> Result { + match serde_json::from_slice(body) { + Ok(e) => Ok(e), + Err(e) => { + error!( + "Failed to deserialize StatusCheckRPCMessage: {:?}", + String::from_utf8(body.to_vec()) + ); + panic!("{:?}", e); + } + } + } + + fn consumer(&mut self, job: &Self::J) -> worker::Actions { + let span = debug_span!("command"); + let _enter = span.enter(); + + vec![worker::Action::Ack] + } +}