feat: add a simple gerrit event filter
This worker transforms a native Gerrit stream event into a OfBorg VCS generic stream event. Signed-off-by: Raito Bezarius <masterancpp@gmail.com>
This commit is contained in:
parent
c8605f7429
commit
2b9a366b24
3 changed files with 153 additions and 0 deletions
86
ofborg/src/bin/gerrit-generic-vcs-filter.rs
Normal file
86
ofborg/src/bin/gerrit-generic-vcs-filter.rs
Normal file
|
@ -0,0 +1,86 @@
|
|||
/// This converts a Gerrit event to a OfBorg's VCS general event.
|
||||
use std::env;
|
||||
use std::error::Error;
|
||||
|
||||
use ofborg::easyamqp::{self, ChannelExt, ConsumerExt};
|
||||
use ofborg::easylapin;
|
||||
use ofborg::tasks;
|
||||
|
||||
use ofborg::config;
|
||||
|
||||
#[tokio::main]
|
||||
async fn main() -> Result<(), Box<dyn Error>> {
|
||||
ofborg::setup_log();
|
||||
|
||||
let arg = env::args()
|
||||
.nth(1)
|
||||
.expect("usage: gerrit-generic-vcs-filter <config>");
|
||||
let cfg = config::load(arg.as_ref());
|
||||
|
||||
let conn = easylapin::from_config(&cfg.rabbitmq).await?;
|
||||
let mut chan = conn.create_channel().await?;
|
||||
|
||||
// Destination exchange: `vcs-events`
|
||||
chan.declare_exchange(easyamqp::ExchangeConfig {
|
||||
exchange: "vcs-events".to_owned(),
|
||||
exchange_type: easyamqp::ExchangeType::Topic,
|
||||
passive: false,
|
||||
durable: true,
|
||||
auto_delete: false,
|
||||
no_wait: false,
|
||||
internal: false,
|
||||
})
|
||||
.await?;
|
||||
|
||||
// Source exchange: `gerrit-events`
|
||||
chan.declare_exchange(easyamqp::ExchangeConfig {
|
||||
exchange: "gerrit-events".to_owned(),
|
||||
exchange_type: easyamqp::ExchangeType::Topic,
|
||||
passive: false,
|
||||
durable: true,
|
||||
auto_delete: false,
|
||||
no_wait: false,
|
||||
internal: false,
|
||||
})
|
||||
.await?;
|
||||
|
||||
// Source queue: `gerrit-generic-filter`
|
||||
let queue_name = String::from("gerrit-generic-filter");
|
||||
chan.declare_queue(easyamqp::QueueConfig {
|
||||
queue: queue_name.clone(),
|
||||
passive: false,
|
||||
durable: true,
|
||||
exclusive: false,
|
||||
auto_delete: false,
|
||||
no_wait: false,
|
||||
})
|
||||
.await?;
|
||||
|
||||
chan.bind_queue(easyamqp::BindQueueConfig {
|
||||
queue: queue_name.clone(),
|
||||
exchange: "gerrit-events".to_owned(),
|
||||
routing_key: Some("gerrit.events.*".to_owned()),
|
||||
no_wait: false,
|
||||
})
|
||||
.await?;
|
||||
|
||||
tracing::info!("Fetching jobs from '{}'", &queue_name);
|
||||
easylapin::WorkerChannel(chan)
|
||||
.consume(
|
||||
tasks::gerrit::GerritEventFilterWorker::new(),
|
||||
easyamqp::ConsumeConfig {
|
||||
queue: queue_name.clone(),
|
||||
consumer_tag: format!("{}-gerrit-generic-vcs-filter", cfg.whoami()),
|
||||
no_local: false,
|
||||
no_ack: false,
|
||||
no_wait: false,
|
||||
exclusive: false,
|
||||
},
|
||||
)
|
||||
.await?;
|
||||
|
||||
drop(conn);
|
||||
tracing::info!("Closed the session... EOF");
|
||||
|
||||
Ok(())
|
||||
}
|
66
ofborg/src/tasks/gerrit/mod.rs
Normal file
66
ofborg/src/tasks/gerrit/mod.rs
Normal file
|
@ -0,0 +1,66 @@
|
|||
use crate::vcs::gerrit::data_structures::GerritStreamEvent;
|
||||
use crate::worker;
|
||||
use async_trait::async_trait;
|
||||
use tracing::debug_span;
|
||||
|
||||
/// This transforms a Gerrit event into a OfBorg event.
|
||||
|
||||
pub struct GerritEventFilterWorker;
|
||||
|
||||
impl GerritEventFilterWorker {
|
||||
pub fn new() -> Self {
|
||||
Self {}
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl worker::SimpleWorker for GerritEventFilterWorker {
|
||||
type J = GerritStreamEvent;
|
||||
|
||||
async fn msg_to_job(
|
||||
&mut self,
|
||||
_: &str,
|
||||
_: &Option<String>,
|
||||
body: &[u8],
|
||||
) -> Result<Self::J, String> {
|
||||
match serde_json::from_slice(body) {
|
||||
Ok(e) => Ok(e),
|
||||
Err(e) => Err(format!(
|
||||
"Failed to deserialize event {:?}: {:?}",
|
||||
e,
|
||||
String::from_utf8(body.to_vec())
|
||||
)),
|
||||
}
|
||||
}
|
||||
|
||||
async fn consumer(
|
||||
&mut self,
|
||||
_chan: &mut lapin::Channel,
|
||||
evt: &GerritStreamEvent,
|
||||
) -> worker::Actions {
|
||||
let span = debug_span!("event");
|
||||
let _enter = span.enter();
|
||||
|
||||
match evt {
|
||||
GerritStreamEvent::DroppedOutput => {
|
||||
tracing::warn!("Gerrit reported loss of output!");
|
||||
// TODO: Update the stat worker about this.
|
||||
}
|
||||
// Transform this into a "new pull request"
|
||||
// Change restored -> rerun CI on it if not already.
|
||||
// Exitting WIP -> run CI if not already
|
||||
GerritStreamEvent::PatchSetCreated { .. } => {}
|
||||
// Change abandoned or deleted or updated -> Change no longer relevant
|
||||
// Entering into WIP -> Change no longer relevant
|
||||
GerritStreamEvent::ChangeAbandoned { .. } => {}
|
||||
// TODO: CommentAdded / HashtagsChanged / TopicChanged
|
||||
// propagate them for chatops support and tagging manipulations
|
||||
// TODO: ChangeMerged
|
||||
// dispatch something to support periodic jobs or further workflows after a commit is
|
||||
// merged, e.g. performance CI, etc.
|
||||
_ => {}
|
||||
}
|
||||
|
||||
vec![worker::Action::Ack]
|
||||
}
|
||||
}
|
|
@ -2,6 +2,7 @@ pub mod build;
|
|||
pub mod eval;
|
||||
pub mod evaluate;
|
||||
pub mod evaluationfilter;
|
||||
pub mod gerrit;
|
||||
pub mod log_message_collector;
|
||||
pub mod pastebin_collector;
|
||||
pub mod statscollector;
|
||||
|
|
Loading…
Reference in a new issue