feat: statcheck worker
Signed-off-by: Raito Bezarius <masterancpp@gmail.com>
This commit is contained in:
parent
2cda3b8267
commit
68f88d40dc
79
ofborg/src/bin/statcheck-worker.rs
Normal file
79
ofborg/src/bin/statcheck-worker.rs
Normal file
|
@ -0,0 +1,79 @@
|
||||||
|
/// Statuses and checks worker
|
||||||
|
/// - will keep a database of changes
|
||||||
|
/// - their statuses
|
||||||
|
/// - their checks
|
||||||
|
/// - is VCS/CI agnostic
|
||||||
|
use std::env;
|
||||||
|
use std::error::Error;
|
||||||
|
|
||||||
|
use async_std::task;
|
||||||
|
use ofborg::config;
|
||||||
|
use ofborg::easyamqp;
|
||||||
|
use ofborg::easyamqp::ChannelExt;
|
||||||
|
use ofborg::easyamqp::ConsumerExt;
|
||||||
|
use ofborg::easylapin;
|
||||||
|
use ofborg::tasks;
|
||||||
|
use tracing::info;
|
||||||
|
|
||||||
|
fn main() -> Result<(), Box<dyn Error>> {
|
||||||
|
ofborg::setup_log();
|
||||||
|
|
||||||
|
let arg = env::args()
|
||||||
|
.nth(1)
|
||||||
|
.expect("usage: statcheck-worker <config>");
|
||||||
|
let cfg = config::load(arg.as_ref());
|
||||||
|
|
||||||
|
let conn = easylapin::from_config(&cfg.rabbitmq)?;
|
||||||
|
let mut chan = task::block_on(conn.create_channel())?;
|
||||||
|
|
||||||
|
// an RPC queue for verbs
|
||||||
|
let api_queue_name = "statcheck-api".to_owned();
|
||||||
|
// an event queue to be notified about statuses & checks changes.
|
||||||
|
let event_queue_name = "statcheck-events".to_owned();
|
||||||
|
|
||||||
|
chan.declare_exchange(easyamqp::ExchangeConfig {
|
||||||
|
exchange: api_queue_name.clone(),
|
||||||
|
exchange_type: easyamqp::ExchangeType::Topic,
|
||||||
|
passive: false,
|
||||||
|
durable: true,
|
||||||
|
auto_delete: false,
|
||||||
|
no_wait: false,
|
||||||
|
internal: false,
|
||||||
|
})?;
|
||||||
|
|
||||||
|
chan.declare_queue(easyamqp::QueueConfig {
|
||||||
|
queue: api_queue_name.clone(),
|
||||||
|
passive: false,
|
||||||
|
durable: true,
|
||||||
|
exclusive: false,
|
||||||
|
auto_delete: false,
|
||||||
|
no_wait: false,
|
||||||
|
})?;
|
||||||
|
|
||||||
|
chan.bind_queue(easyamqp::BindQueueConfig {
|
||||||
|
queue: api_queue_name.clone(),
|
||||||
|
exchange: api_queue_name.clone(),
|
||||||
|
routing_key: None,
|
||||||
|
no_wait: false,
|
||||||
|
})?;
|
||||||
|
|
||||||
|
let handle = easylapin::WorkerChannel(chan).consume(
|
||||||
|
tasks::statcheck_collector::StatusCheckCollector::new(cfg.statcheck.clone().db),
|
||||||
|
easyamqp::ConsumeConfig {
|
||||||
|
queue: api_queue_name.clone(),
|
||||||
|
consumer_tag: format!("{}-{}", cfg.whoami(), api_queue_name),
|
||||||
|
no_local: false,
|
||||||
|
no_ack: false,
|
||||||
|
no_wait: false,
|
||||||
|
exclusive: false,
|
||||||
|
},
|
||||||
|
)?;
|
||||||
|
|
||||||
|
info!("Waiting for API calls on {}", api_queue_name);
|
||||||
|
info!("Notifying of new changes on {}", event_queue_name);
|
||||||
|
task::block_on(handle);
|
||||||
|
|
||||||
|
drop(conn); // Close connection.
|
||||||
|
info!("Closed the session... EOF");
|
||||||
|
Ok(())
|
||||||
|
}
|
46
ofborg/src/tasks/status_check_collector.rs
Normal file
46
ofborg/src/tasks/status_check_collector.rs
Normal file
|
@ -0,0 +1,46 @@
|
||||||
|
use std::path::PathBuf;
|
||||||
|
|
||||||
|
use tracing::{debug_span, error};
|
||||||
|
|
||||||
|
use crate::worker;
|
||||||
|
|
||||||
|
#[derive(Serialize, Deserialize, Debug)]
|
||||||
|
enum StatusCheckRPCMessage {
|
||||||
|
ListStatuses,
|
||||||
|
ListChecks,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[allow(dead_code)]
|
||||||
|
struct StatusCheckCollector {
|
||||||
|
db_path: PathBuf,
|
||||||
|
}
|
||||||
|
|
||||||
|
// RPC API worker
|
||||||
|
impl worker::SimpleWorker for StatusCheckCollector {
|
||||||
|
type J = StatusCheckRPCMessage;
|
||||||
|
|
||||||
|
fn msg_to_job(
|
||||||
|
&mut self,
|
||||||
|
_method: &str,
|
||||||
|
_headers: &Option<String>,
|
||||||
|
body: &[u8],
|
||||||
|
) -> Result<Self::J, String> {
|
||||||
|
match serde_json::from_slice(body) {
|
||||||
|
Ok(e) => Ok(e),
|
||||||
|
Err(e) => {
|
||||||
|
error!(
|
||||||
|
"Failed to deserialize StatusCheckRPCMessage: {:?}",
|
||||||
|
String::from_utf8(body.to_vec())
|
||||||
|
);
|
||||||
|
panic!("{:?}", e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn consumer(&mut self, job: &Self::J) -> worker::Actions {
|
||||||
|
let span = debug_span!("command");
|
||||||
|
let _enter = span.enter();
|
||||||
|
|
||||||
|
vec![worker::Action::Ack]
|
||||||
|
}
|
||||||
|
}
|
Loading…
Reference in a new issue