feat: statcheck worker
Signed-off-by: Raito Bezarius <masterancpp@gmail.com>
This commit is contained in:
parent
46b6519fc8
commit
3265bef2eb
2 changed files with 125 additions and 0 deletions
79
ofborg/src/bin/statcheck-worker.rs
Normal file
79
ofborg/src/bin/statcheck-worker.rs
Normal file
|
@ -0,0 +1,79 @@
|
|||
/// Statuses and checks worker
|
||||
/// - will keep a database of changes
|
||||
/// - their statuses
|
||||
/// - their checks
|
||||
/// - is VCS/CI agnostic
|
||||
use std::env;
|
||||
use std::error::Error;
|
||||
|
||||
use async_std::task;
|
||||
use ofborg::config;
|
||||
use ofborg::easyamqp;
|
||||
use ofborg::easyamqp::ChannelExt;
|
||||
use ofborg::easyamqp::ConsumerExt;
|
||||
use ofborg::easylapin;
|
||||
use ofborg::tasks;
|
||||
use tracing::info;
|
||||
|
||||
fn main() -> Result<(), Box<dyn Error>> {
|
||||
ofborg::setup_log();
|
||||
|
||||
let arg = env::args()
|
||||
.nth(1)
|
||||
.expect("usage: statcheck-worker <config>");
|
||||
let cfg = config::load(arg.as_ref());
|
||||
|
||||
let conn = easylapin::from_config(&cfg.rabbitmq)?;
|
||||
let mut chan = task::block_on(conn.create_channel())?;
|
||||
|
||||
// an RPC queue for verbs
|
||||
let api_queue_name = "statcheck-api".to_owned();
|
||||
// an event queue to be notified about statuses & checks changes.
|
||||
let event_queue_name = "statcheck-events".to_owned();
|
||||
|
||||
chan.declare_exchange(easyamqp::ExchangeConfig {
|
||||
exchange: api_queue_name.clone(),
|
||||
exchange_type: easyamqp::ExchangeType::Topic,
|
||||
passive: false,
|
||||
durable: true,
|
||||
auto_delete: false,
|
||||
no_wait: false,
|
||||
internal: false,
|
||||
})?;
|
||||
|
||||
chan.declare_queue(easyamqp::QueueConfig {
|
||||
queue: api_queue_name.clone(),
|
||||
passive: false,
|
||||
durable: true,
|
||||
exclusive: false,
|
||||
auto_delete: false,
|
||||
no_wait: false,
|
||||
})?;
|
||||
|
||||
chan.bind_queue(easyamqp::BindQueueConfig {
|
||||
queue: api_queue_name.clone(),
|
||||
exchange: api_queue_name.clone(),
|
||||
routing_key: None,
|
||||
no_wait: false,
|
||||
})?;
|
||||
|
||||
let handle = easylapin::WorkerChannel(chan).consume(
|
||||
tasks::statcheck_collector::StatusCheckCollector::new(cfg.statcheck.clone().db),
|
||||
easyamqp::ConsumeConfig {
|
||||
queue: api_queue_name.clone(),
|
||||
consumer_tag: format!("{}-{}", cfg.whoami(), api_queue_name),
|
||||
no_local: false,
|
||||
no_ack: false,
|
||||
no_wait: false,
|
||||
exclusive: false,
|
||||
},
|
||||
)?;
|
||||
|
||||
info!("Waiting for API calls on {}", api_queue_name);
|
||||
info!("Notifying of new changes on {}", event_queue_name);
|
||||
task::block_on(handle);
|
||||
|
||||
drop(conn); // Close connection.
|
||||
info!("Closed the session... EOF");
|
||||
Ok(())
|
||||
}
|
46
ofborg/src/tasks/status_check_collector.rs
Normal file
46
ofborg/src/tasks/status_check_collector.rs
Normal file
|
@ -0,0 +1,46 @@
|
|||
use std::path::PathBuf;
|
||||
|
||||
use tracing::{debug_span, error};
|
||||
|
||||
use crate::worker;
|
||||
|
||||
#[derive(Serialize, Deserialize, Debug)]
|
||||
enum StatusCheckRPCMessage {
|
||||
ListStatuses,
|
||||
ListChecks,
|
||||
}
|
||||
|
||||
#[allow(dead_code)]
|
||||
struct StatusCheckCollector {
|
||||
db_path: PathBuf,
|
||||
}
|
||||
|
||||
// RPC API worker
|
||||
impl worker::SimpleWorker for StatusCheckCollector {
|
||||
type J = StatusCheckRPCMessage;
|
||||
|
||||
fn msg_to_job(
|
||||
&mut self,
|
||||
_method: &str,
|
||||
_headers: &Option<String>,
|
||||
body: &[u8],
|
||||
) -> Result<Self::J, String> {
|
||||
match serde_json::from_slice(body) {
|
||||
Ok(e) => Ok(e),
|
||||
Err(e) => {
|
||||
error!(
|
||||
"Failed to deserialize StatusCheckRPCMessage: {:?}",
|
||||
String::from_utf8(body.to_vec())
|
||||
);
|
||||
panic!("{:?}", e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn consumer(&mut self, job: &Self::J) -> worker::Actions {
|
||||
let span = debug_span!("command");
|
||||
let _enter = span.enter();
|
||||
|
||||
vec![worker::Action::Ack]
|
||||
}
|
||||
}
|
Loading…
Reference in a new issue