feat: add a simple gerrit event filter

This worker transforms a native Gerrit stream event into a OfBorg VCS
generic stream event.

Signed-off-by: Raito Bezarius <masterancpp@gmail.com>
This commit is contained in:
raito 2024-12-15 22:11:15 +01:00
parent c8605f7429
commit 6012726778
3 changed files with 160 additions and 0 deletions

View file

@ -0,0 +1,86 @@
/// This converts a Gerrit event to a OfBorg's VCS general event.
use std::env;
use std::error::Error;
use ofborg::easyamqp::{self, ChannelExt, ConsumerExt};
use ofborg::easylapin;
use ofborg::tasks;
use ofborg::config;
#[tokio::main]
async fn main() -> Result<(), Box<dyn Error>> {
ofborg::setup_log();
let arg = env::args()
.nth(1)
.expect("usage: gerrit-generic-vcs-filter <config>");
let cfg = config::load(arg.as_ref());
let conn = easylapin::from_config(&cfg.rabbitmq).await?;
let mut chan = conn.create_channel().await?;
// Destination exchange: `vcs-events`
chan.declare_exchange(easyamqp::ExchangeConfig {
exchange: "vcs-events".to_owned(),
exchange_type: easyamqp::ExchangeType::Topic,
passive: false,
durable: true,
auto_delete: false,
no_wait: false,
internal: false,
})
.await?;
// Source exchange: `gerrit-events`
chan.declare_exchange(easyamqp::ExchangeConfig {
exchange: "gerrit-events".to_owned(),
exchange_type: easyamqp::ExchangeType::Topic,
passive: false,
durable: true,
auto_delete: false,
no_wait: false,
internal: false,
})
.await?;
// Source queue: `gerrit-generic-filter`
let queue_name = String::from("gerrit-generic-filter");
chan.declare_queue(easyamqp::QueueConfig {
queue: queue_name.clone(),
passive: false,
durable: true,
exclusive: false,
auto_delete: false,
no_wait: false,
})
.await?;
chan.bind_queue(easyamqp::BindQueueConfig {
queue: queue_name.clone(),
exchange: "gerrit-events".to_owned(),
routing_key: Some("gerrit.events.*".to_owned()),
no_wait: false,
})
.await?;
tracing::info!("Fetching jobs from '{}'", &queue_name);
easylapin::WorkerChannel(chan)
.consume(
tasks::gerrit::GerritEventFilterWorker::new(),
easyamqp::ConsumeConfig {
queue: queue_name.clone(),
consumer_tag: format!("{}-gerrit-generic-vcs-filter", cfg.whoami()),
no_local: false,
no_ack: false,
no_wait: false,
exclusive: false,
},
)
.await?;
drop(conn);
tracing::info!("Closed the session... EOF");
Ok(())
}

View file

@ -0,0 +1,73 @@
use crate::vcs::gerrit::data_structures::GerritStreamEvent;
use crate::worker;
use async_trait::async_trait;
use tracing::debug_span;
/// This transforms a Gerrit event into a OfBorg event.
pub struct GerritEventFilterWorker;
impl Default for GerritEventFilterWorker {
fn default() -> Self {
Self::new()
}
}
impl GerritEventFilterWorker {
#[must_use]
pub fn new() -> Self {
Self {}
}
}
#[async_trait]
impl worker::SimpleWorker for GerritEventFilterWorker {
type J = GerritStreamEvent;
async fn msg_to_job(
&mut self,
_: &str,
_: &Option<String>,
body: &[u8],
) -> Result<Self::J, String> {
match serde_json::from_slice(body) {
Ok(e) => Ok(e),
Err(e) => Err(format!(
"Failed to deserialize event {:?}: {:?}",
e,
String::from_utf8(body.to_vec())
)),
}
}
async fn consumer(
&mut self,
_chan: &mut lapin::Channel,
evt: &GerritStreamEvent,
) -> worker::Actions {
let span = debug_span!("event");
let _enter = span.enter();
match evt {
GerritStreamEvent::DroppedOutput => {
tracing::warn!("Gerrit reported loss of output!");
// TODO: Update the stat worker about this.
}
// Transform this into a "new pull request"
// Change restored -> rerun CI on it if not already.
// Exitting WIP -> run CI if not already
GerritStreamEvent::PatchSetCreated { .. } => {}
// Change abandoned or deleted or updated -> Change no longer relevant
// Entering into WIP -> Change no longer relevant
GerritStreamEvent::ChangeAbandoned { .. } => {}
// TODO: CommentAdded / HashtagsChanged / TopicChanged
// propagate them for chatops support and tagging manipulations
// TODO: ChangeMerged
// dispatch something to support periodic jobs or further workflows after a commit is
// merged, e.g. performance CI, etc.
_ => {}
}
vec![worker::Action::Ack]
}
}

View file

@ -2,6 +2,7 @@ pub mod build;
pub mod eval;
pub mod evaluate;
pub mod evaluationfilter;
pub mod gerrit;
pub mod log_message_collector;
pub mod pastebin_collector;
pub mod statscollector;