From 2b9a366b24d56041ed887cba662dd41e921786b6 Mon Sep 17 00:00:00 2001 From: Raito Bezarius Date: Sun, 15 Dec 2024 22:11:15 +0100 Subject: [PATCH] feat: add a simple gerrit event filter This worker transforms a native Gerrit stream event into a OfBorg VCS generic stream event. Signed-off-by: Raito Bezarius --- ofborg/src/bin/gerrit-generic-vcs-filter.rs | 86 +++++++++++++++++++++ ofborg/src/tasks/gerrit/mod.rs | 66 ++++++++++++++++ ofborg/src/tasks/mod.rs | 1 + 3 files changed, 153 insertions(+) create mode 100644 ofborg/src/bin/gerrit-generic-vcs-filter.rs create mode 100644 ofborg/src/tasks/gerrit/mod.rs diff --git a/ofborg/src/bin/gerrit-generic-vcs-filter.rs b/ofborg/src/bin/gerrit-generic-vcs-filter.rs new file mode 100644 index 0000000..a7a4b99 --- /dev/null +++ b/ofborg/src/bin/gerrit-generic-vcs-filter.rs @@ -0,0 +1,86 @@ +/// This converts a Gerrit event to a OfBorg's VCS general event. +use std::env; +use std::error::Error; + +use ofborg::easyamqp::{self, ChannelExt, ConsumerExt}; +use ofborg::easylapin; +use ofborg::tasks; + +use ofborg::config; + +#[tokio::main] +async fn main() -> Result<(), Box> { + ofborg::setup_log(); + + let arg = env::args() + .nth(1) + .expect("usage: gerrit-generic-vcs-filter "); + let cfg = config::load(arg.as_ref()); + + let conn = easylapin::from_config(&cfg.rabbitmq).await?; + let mut chan = conn.create_channel().await?; + + // Destination exchange: `vcs-events` + chan.declare_exchange(easyamqp::ExchangeConfig { + exchange: "vcs-events".to_owned(), + exchange_type: easyamqp::ExchangeType::Topic, + passive: false, + durable: true, + auto_delete: false, + no_wait: false, + internal: false, + }) + .await?; + + // Source exchange: `gerrit-events` + chan.declare_exchange(easyamqp::ExchangeConfig { + exchange: "gerrit-events".to_owned(), + exchange_type: easyamqp::ExchangeType::Topic, + passive: false, + durable: true, + auto_delete: false, + no_wait: false, + internal: false, + }) + .await?; + + // Source queue: `gerrit-generic-filter` + let queue_name = String::from("gerrit-generic-filter"); + chan.declare_queue(easyamqp::QueueConfig { + queue: queue_name.clone(), + passive: false, + durable: true, + exclusive: false, + auto_delete: false, + no_wait: false, + }) + .await?; + + chan.bind_queue(easyamqp::BindQueueConfig { + queue: queue_name.clone(), + exchange: "gerrit-events".to_owned(), + routing_key: Some("gerrit.events.*".to_owned()), + no_wait: false, + }) + .await?; + + tracing::info!("Fetching jobs from '{}'", &queue_name); + easylapin::WorkerChannel(chan) + .consume( + tasks::gerrit::GerritEventFilterWorker::new(), + easyamqp::ConsumeConfig { + queue: queue_name.clone(), + consumer_tag: format!("{}-gerrit-generic-vcs-filter", cfg.whoami()), + no_local: false, + no_ack: false, + no_wait: false, + exclusive: false, + }, + ) + .await?; + + drop(conn); + tracing::info!("Closed the session... EOF"); + + Ok(()) +} diff --git a/ofborg/src/tasks/gerrit/mod.rs b/ofborg/src/tasks/gerrit/mod.rs new file mode 100644 index 0000000..8a47be9 --- /dev/null +++ b/ofborg/src/tasks/gerrit/mod.rs @@ -0,0 +1,66 @@ +use crate::vcs::gerrit::data_structures::GerritStreamEvent; +use crate::worker; +use async_trait::async_trait; +use tracing::debug_span; + +/// This transforms a Gerrit event into a OfBorg event. + +pub struct GerritEventFilterWorker; + +impl GerritEventFilterWorker { + pub fn new() -> Self { + Self {} + } +} + +#[async_trait] +impl worker::SimpleWorker for GerritEventFilterWorker { + type J = GerritStreamEvent; + + async fn msg_to_job( + &mut self, + _: &str, + _: &Option, + body: &[u8], + ) -> Result { + match serde_json::from_slice(body) { + Ok(e) => Ok(e), + Err(e) => Err(format!( + "Failed to deserialize event {:?}: {:?}", + e, + String::from_utf8(body.to_vec()) + )), + } + } + + async fn consumer( + &mut self, + _chan: &mut lapin::Channel, + evt: &GerritStreamEvent, + ) -> worker::Actions { + let span = debug_span!("event"); + let _enter = span.enter(); + + match evt { + GerritStreamEvent::DroppedOutput => { + tracing::warn!("Gerrit reported loss of output!"); + // TODO: Update the stat worker about this. + } + // Transform this into a "new pull request" + // Change restored -> rerun CI on it if not already. + // Exitting WIP -> run CI if not already + GerritStreamEvent::PatchSetCreated { .. } => {} + // Change abandoned or deleted or updated -> Change no longer relevant + // Entering into WIP -> Change no longer relevant + GerritStreamEvent::ChangeAbandoned { .. } => {} + // TODO: CommentAdded / HashtagsChanged / TopicChanged + // propagate them for chatops support and tagging manipulations + // TODO: ChangeMerged + // dispatch something to support periodic jobs or further workflows after a commit is + // merged, e.g. performance CI, etc. + _ => {} + } + + vec![worker::Action::Ack] + } +} diff --git a/ofborg/src/tasks/mod.rs b/ofborg/src/tasks/mod.rs index 34917ed..68c82d9 100644 --- a/ofborg/src/tasks/mod.rs +++ b/ofborg/src/tasks/mod.rs @@ -2,6 +2,7 @@ pub mod build; pub mod eval; pub mod evaluate; pub mod evaluationfilter; +pub mod gerrit; pub mod log_message_collector; pub mod pastebin_collector; pub mod statscollector;