WIP: generalize for Gerrit / Floral #3

Draft
raito wants to merge 33 commits from vcs-generalization into main
4 changed files with 139 additions and 0 deletions
Showing only changes of commit c922e6e455 - Show all commits

View file

@ -0,0 +1,79 @@
/// Statuses and checks worker
/// - will keep a database of changes
/// - their statuses
/// - their checks
/// - is VCS/CI agnostic
use std::env;
use std::error::Error;
use async_std::task;
use ofborg::config;
use ofborg::easyamqp;
use ofborg::easyamqp::ChannelExt;
use ofborg::easyamqp::ConsumerExt;
use ofborg::easylapin;
use ofborg::tasks;
use tracing::info;
fn main() -> Result<(), Box<dyn Error>> {
ofborg::setup_log();
let arg = env::args()
.nth(1)
.expect("usage: statcheck-worker <config>");
let cfg = config::load(arg.as_ref());
let conn = easylapin::from_config(&cfg.rabbitmq)?;
let mut chan = task::block_on(conn.create_channel())?;
// an RPC queue for verbs
let api_queue_name = "statcheck-api".to_owned();
// an event queue to be notified about statuses & checks changes.
let event_queue_name = "statcheck-events".to_owned();
chan.declare_exchange(easyamqp::ExchangeConfig {
exchange: api_queue_name.clone(),
exchange_type: easyamqp::ExchangeType::Topic,
passive: false,
durable: true,
auto_delete: false,
no_wait: false,
internal: false,
})?;
chan.declare_queue(easyamqp::QueueConfig {
queue: api_queue_name.clone(),
passive: false,
durable: true,
exclusive: false,
auto_delete: false,
no_wait: false,
})?;
chan.bind_queue(easyamqp::BindQueueConfig {
queue: api_queue_name.clone(),
exchange: api_queue_name.clone(),
routing_key: None,
no_wait: false,
})?;
let handle = easylapin::WorkerChannel(chan).consume(
tasks::status_check_collector::StatusCheckCollector::new(cfg.statcheck.clone().db),
easyamqp::ConsumeConfig {
queue: api_queue_name.clone(),
consumer_tag: format!("{}-{}", cfg.whoami(), api_queue_name),
no_local: false,
no_ack: false,
no_wait: false,
exclusive: false,
},
)?;
info!("Waiting for API calls on {}", api_queue_name);
info!("Notifying of new changes on {}", event_queue_name);
task::block_on(handle);
drop(conn); // Close connection.
info!("Closed the session... EOF");
Ok(())
}

View file

@ -26,6 +26,7 @@ pub struct Config {
pub nix: NixConfig,
pub rabbitmq: RabbitMqConfig,
pub vcs: VCSConfig,
pub statcheck: StatusCheckConfig,
pub pastebin: PastebinConfig,
pub log_storage: Option<LogStorage>,
@ -42,6 +43,12 @@ pub struct FeedbackConfig {
pub full_logs: bool,
}
#[derive(Serialize, Deserialize, Debug, Clone)]
pub struct StatusCheckConfig {
#[serde(deserialize_with = "deserialize_and_expand_pathbuf")]
pub db: PathBuf,
}
#[derive(Serialize, Deserialize, Debug, Clone)]
pub struct PastebinConfig {
#[serde(deserialize_with = "deserialize_and_expand_pathbuf")]

View file

@ -7,3 +7,4 @@ pub mod githubcommentposter;
pub mod log_message_collector;
pub mod pastebin_collector;
pub mod statscollector;
pub mod status_check_collector;

View file

@ -0,0 +1,52 @@
use std::path::PathBuf;
use tracing::{debug_span, error};
use crate::worker;
#[derive(Serialize, Deserialize, Debug)]
pub enum StatusCheckRPCMessage {
ListStatuses,
ListChecks,
}
#[allow(dead_code)]
pub struct StatusCheckCollector {
db_path: PathBuf,
}
impl StatusCheckCollector {
pub fn new(db_path: PathBuf) -> Self {
Self { db_path }
}
}
// RPC API worker
impl worker::SimpleWorker for StatusCheckCollector {
type J = StatusCheckRPCMessage;
fn msg_to_job(
&mut self,
_method: &str,
_headers: &Option<String>,
body: &[u8],
) -> Result<Self::J, String> {
match serde_json::from_slice(body) {
Ok(e) => Ok(e),
Err(e) => {
error!(
"Failed to deserialize StatusCheckRPCMessage: {:?}",
String::from_utf8(body.to_vec())
);
panic!("{:?}", e);
}
}
}
fn consumer(&mut self, _chan: &mut lapin::Channel, _job: &Self::J) -> worker::Actions {
let span = debug_span!("command");
let _enter = span.enter();
vec![worker::Action::Ack]
}
}