feat: statcheck worker

Signed-off-by: Raito Bezarius <masterancpp@gmail.com>
This commit is contained in:
raito 2024-11-05 22:19:57 +01:00
parent b79a48bd73
commit ecdd6d52a6
2 changed files with 125 additions and 0 deletions

View file

@ -0,0 +1,79 @@
/// Statuses and checks worker
/// - will keep a database of changes
/// - their statuses
/// - their checks
/// - is VCS/CI agnostic
use std::env;
use std::error::Error;
use async_std::task;
use ofborg::config;
use ofborg::easyamqp;
use ofborg::easyamqp::ChannelExt;
use ofborg::easyamqp::ConsumerExt;
use ofborg::easylapin;
use ofborg::tasks;
use tracing::info;
fn main() -> Result<(), Box<dyn Error>> {
ofborg::setup_log();
let arg = env::args()
.nth(1)
.expect("usage: statcheck-worker <config>");
let cfg = config::load(arg.as_ref());
let conn = easylapin::from_config(&cfg.rabbitmq)?;
let mut chan = task::block_on(conn.create_channel())?;
// an RPC queue for verbs
let api_queue_name = "statcheck-api".to_owned();
// an event queue to be notified about statuses & checks changes.
let event_queue_name = "statcheck-events".to_owned();
chan.declare_exchange(easyamqp::ExchangeConfig {
exchange: api_queue_name.clone(),
exchange_type: easyamqp::ExchangeType::Topic,
passive: false,
durable: true,
auto_delete: false,
no_wait: false,
internal: false,
})?;
chan.declare_queue(easyamqp::QueueConfig {
queue: api_queue_name.clone(),
passive: false,
durable: true,
exclusive: false,
auto_delete: false,
no_wait: false,
})?;
chan.bind_queue(easyamqp::BindQueueConfig {
queue: api_queue_name.clone(),
exchange: api_queue_name.clone(),
routing_key: None,
no_wait: false,
})?;
let handle = easylapin::WorkerChannel(chan).consume(
tasks::statcheck_collector::StatusCheckCollector::new(cfg.statcheck.clone().db),
easyamqp::ConsumeConfig {
queue: api_queue_name.clone(),
consumer_tag: format!("{}-{}", cfg.whoami(), api_queue_name),
no_local: false,
no_ack: false,
no_wait: false,
exclusive: false,
},
)?;
info!("Waiting for API calls on {}", api_queue_name);
info!("Notifying of new changes on {}", event_queue_name);
task::block_on(handle);
drop(conn); // Close connection.
info!("Closed the session... EOF");
Ok(())
}

View file

@ -0,0 +1,46 @@
use std::path::PathBuf;
use tracing::{debug_span, error};
use crate::worker;
#[derive(Serialize, Deserialize, Debug)]
enum StatusCheckRPCMessage {
ListStatuses,
ListChecks,
}
#[allow(dead_code)]
struct StatusCheckCollector {
db_path: PathBuf,
}
// RPC API worker
impl worker::SimpleWorker for StatusCheckCollector {
type J = StatusCheckRPCMessage;
fn msg_to_job(
&mut self,
_method: &str,
_headers: &Option<String>,
body: &[u8],
) -> Result<Self::J, String> {
match serde_json::from_slice(body) {
Ok(e) => Ok(e),
Err(e) => {
error!(
"Failed to deserialize StatusCheckRPCMessage: {:?}",
String::from_utf8(body.to_vec())
);
panic!("{:?}", e);
}
}
}
fn consumer(&mut self, job: &Self::J) -> worker::Actions {
let span = debug_span!("command");
let _enter = span.enter();
vec![worker::Action::Ack]
}
}