Compare commits

...

5 commits

Author SHA1 Message Date
bb08c8cb97 feat(tasks/evaluation-filter): consume from VCSEvents
Simplified to the maximum, it consumes from VCSEvents in the case of
creation & update.

It does not handle cancellation yet.

Signed-off-by: Raito Bezarius <masterancpp@gmail.com>
2024-12-15 23:39:18 +01:00
6a628c3cdd feat: add basic event transformation from Gerrit to generic VCS ones
Those are very crude basic generalization of Gerrit specific events.

Signed-off-by: Raito Bezarius <masterancpp@gmail.com>
2024-12-15 23:10:00 +01:00
2b9a366b24 feat: add a simple gerrit event filter
This worker transforms a native Gerrit stream event into a OfBorg VCS
generic stream event.

Signed-off-by: Raito Bezarius <masterancpp@gmail.com>
2024-12-15 22:11:23 +01:00
c8605f7429 chore: remove listen-gerrit-events
Signed-off-by: Raito Bezarius <masterancpp@gmail.com>
2024-12-15 22:10:48 +01:00
69f6ae8cc2 feat: finish gerrit-event-streamer
Drop the logging, use a proper routing key, add error reporting for
error cases.

Signed-off-by: Raito Bezarius <masterancpp@gmail.com>
2024-12-15 22:10:40 +01:00
8 changed files with 234 additions and 101 deletions

View file

@ -51,17 +51,18 @@ async fn main() -> Result<(), Box<dyn Error>> {
)
.await;
let routing_key = "abc";
// TODO: add the specific instance name in multi-active contexts.
let routing_key = "gerrit.events";
let event_stream = gerrit_api.stream_events().await.unwrap();
pin_mut!(event_stream);
loop {
let raw_evt = event_stream.next().await;
tracing::debug!("{:?}", raw_evt);
match raw_evt {
Some(Ok(event)) => {
println!("{:#?}", event);
let queue_message =
prepare_queue_message(Some(exchange_name), Some(routing_key), &event);
let props = BasicProperties::default()
@ -86,11 +87,11 @@ async fn main() -> Result<(), Box<dyn Error>> {
}
}
}
Some(Err(_err)) => {
// notify the event
Some(Err(err)) => {
tracing::error!("Failed to read a new Gerrit event: {}", err);
}
None => {
// notify the event
tracing::warn!("No event in the Gerrit stream, retrying...");
}
}
}

View file

@ -0,0 +1,86 @@
/// This converts a Gerrit event to a OfBorg's VCS general event.
use std::env;
use std::error::Error;
use ofborg::easyamqp::{self, ChannelExt, ConsumerExt};
use ofborg::easylapin;
use ofborg::tasks;
use ofborg::config;
#[tokio::main]
async fn main() -> Result<(), Box<dyn Error>> {
ofborg::setup_log();
let arg = env::args()
.nth(1)
.expect("usage: gerrit-generic-vcs-filter <config>");
let cfg = config::load(arg.as_ref());
let conn = easylapin::from_config(&cfg.rabbitmq).await?;
let mut chan = conn.create_channel().await?;
// Destination exchange: `vcs-events`
chan.declare_exchange(easyamqp::ExchangeConfig {
exchange: "vcs-events".to_owned(),
exchange_type: easyamqp::ExchangeType::Topic,
passive: false,
durable: true,
auto_delete: false,
no_wait: false,
internal: false,
})
.await?;
// Source exchange: `gerrit-events`
chan.declare_exchange(easyamqp::ExchangeConfig {
exchange: "gerrit-events".to_owned(),
exchange_type: easyamqp::ExchangeType::Topic,
passive: false,
durable: true,
auto_delete: false,
no_wait: false,
internal: false,
})
.await?;
// Source queue: `gerrit-generic-filter`
let queue_name = String::from("gerrit-generic-filter");
chan.declare_queue(easyamqp::QueueConfig {
queue: queue_name.clone(),
passive: false,
durable: true,
exclusive: false,
auto_delete: false,
no_wait: false,
})
.await?;
chan.bind_queue(easyamqp::BindQueueConfig {
queue: queue_name.clone(),
exchange: "gerrit-events".to_owned(),
routing_key: Some("gerrit.events.*".to_owned()),
no_wait: false,
})
.await?;
tracing::info!("Fetching jobs from '{}'", &queue_name);
easylapin::WorkerChannel(chan)
.consume(
tasks::gerrit::GerritEventFilterWorker::new(),
easyamqp::ConsumeConfig {
queue: queue_name.clone(),
consumer_tag: format!("{}-gerrit-generic-vcs-filter", cfg.whoami()),
no_local: false,
no_ack: false,
no_wait: false,
exclusive: false,
},
)
.await?;
drop(conn);
tracing::info!("Closed the session... EOF");
Ok(())
}

View file

@ -1,37 +0,0 @@
/// This is a Gerrit listener for events which puts them on stdout for debugging purposes.
/// The list of event type listened to is static.
use std::env;
use std::error::Error;
use futures::{pin_mut, StreamExt};
use ofborg::vcs::gerrit::ssh::GerritSSHApi;
use tracing::info;
use ofborg::config;
#[tokio::main]
async fn main() -> Result<(), Box<dyn Error>> {
ofborg::setup_log();
let arg = env::args()
.nth(1)
.expect("usage: listen-gerrit-events <config>");
let cfg = config::load(arg.as_ref());
let gerrit_cfg = cfg
.gerrit
.expect("Gerrit event streaming requires a Gerrit configuration");
let gerrit_ssh_uri = format!("ssh://{}:{}", gerrit_cfg.instance_uri, gerrit_cfg.ssh_port);
info!("Listening events from Gerrit on {}", gerrit_ssh_uri);
let mut gerrit_api = GerritSSHApi::new(gerrit_cfg.ssh_private_key_file, &gerrit_ssh_uri).await;
let event_stream = gerrit_api.stream_events().await.unwrap();
pin_mut!(event_stream);
loop {
let thing = event_stream.next().await;
println!("{:?}", thing);
if let Some(Ok(event)) = thing {
println!("{:#?}", event);
}
}
}

View file

@ -3,5 +3,6 @@ pub mod buildlogmsg;
pub mod buildresult;
mod common;
pub mod evaluationjob;
pub mod vcs;
pub use self::common::{Change, Repo};

12
ofborg/src/message/vcs.rs Normal file
View file

@ -0,0 +1,12 @@
/// VCS generic events
use serde::{Deserialize, Serialize};
use super::{Change, Repo};
#[derive(Serialize, Deserialize, Debug)]
pub enum VCSEvent {
ChangeCreated { change: Change, repo: Repo },
ChangeDeleted { change: Change, repo: Repo },
ChangeUpdated { change: Change, repo: Repo },
WIPStatusChange { change: Change, wip: bool },
}

View file

@ -1,6 +1,6 @@
use crate::acl;
use crate::ghevent;
use crate::message::{evaluationjob, Change, Repo};
use crate::message::evaluationjob;
use crate::message::vcs::VCSEvent;
use crate::worker;
use async_trait::async_trait;
@ -19,7 +19,7 @@ impl EvaluationFilterWorker {
#[async_trait]
impl worker::SimpleWorker for EvaluationFilterWorker {
type J = ghevent::PullRequestEvent;
type J = VCSEvent;
async fn msg_to_job(
&mut self,
@ -37,70 +37,24 @@ impl worker::SimpleWorker for EvaluationFilterWorker {
}
}
async fn consumer(
&mut self,
_chan: &mut lapin::Channel,
job: &ghevent::PullRequestEvent,
) -> worker::Actions {
let span = debug_span!("job", pr = ?job.number);
async fn consumer(&mut self, _chan: &mut lapin::Channel, job: &VCSEvent) -> worker::Actions {
let span = debug_span!("event", event = ?job);
let _enter = span.enter();
if !self.acl.is_repo_eligible(&job.repository.full_name) {
info!("Repo not authorized ({})", job.repository.full_name);
return vec![worker::Action::Ack];
}
if job.pull_request.state != ghevent::PullRequestState::Open {
info!(
"PR is not open ({}#{})",
job.repository.full_name, job.number
);
return vec![worker::Action::Ack];
}
let interesting: bool = match job.action {
ghevent::PullRequestAction::Opened
| ghevent::PullRequestAction::Synchronize
| ghevent::PullRequestAction::Reopened => true,
ghevent::PullRequestAction::Edited => {
if let Some(ref changes) = job.changes {
changes.base.is_some()
} else {
false
}
}
ghevent::PullRequestAction::Unknown => false,
let (change, repo) = match job {
VCSEvent::ChangeCreated { change, repo } => (change, repo),
VCSEvent::ChangeUpdated { change, repo } => (change, repo),
_ => return vec![worker::Action::Ack],
};
if !interesting {
info!(
"Not interesting: {}#{} because of {:?}",
job.repository.full_name, job.number, job.action
);
if !self.acl.is_repo_eligible(&repo.full_name) {
info!("Repo not authorized ({})", &repo.full_name);
return vec![worker::Action::Ack];
}
info!(
"Found {}#{} to be interesting because of {:?}",
job.repository.full_name, job.number, job.action
);
let repo_msg = Repo {
clone_url: job.repository.clone_url.clone(),
full_name: job.repository.full_name.clone(),
owner: job.repository.owner.login.clone(),
name: job.repository.name.clone(),
};
let change_msg = Change {
number: job.number,
head_sha: job.pull_request.head.sha.clone(),
target_branch: Some(job.pull_request.base.git_ref.clone()),
};
let msg = evaluationjob::EvaluationJob {
repo: repo_msg,
change: change_msg,
repo: repo.clone(),
change: change.clone(),
};
vec![

View file

@ -0,0 +1,115 @@
/// This transforms a Gerrit event into a OfBorg event.
use crate::message::vcs::VCSEvent;
use crate::vcs::gerrit::data_structures::GerritStreamEvent;
use crate::worker;
use async_trait::async_trait;
use tracing::debug_span;
pub struct GerritEventFilterWorker;
impl GerritEventFilterWorker {
pub fn new() -> Self {
Self {}
}
}
#[async_trait]
impl worker::SimpleWorker for GerritEventFilterWorker {
type J = GerritStreamEvent;
async fn msg_to_job(
&mut self,
_: &str,
_: &Option<String>,
body: &[u8],
) -> Result<Self::J, String> {
match serde_json::from_slice(body) {
Ok(e) => Ok(e),
Err(e) => Err(format!(
"Failed to deserialize event {:?}: {:?}",
e,
String::from_utf8(body.to_vec())
)),
}
}
async fn consumer(
&mut self,
_chan: &mut lapin::Channel,
evt: &GerritStreamEvent,
) -> worker::Actions {
let span = debug_span!("event");
let _enter = span.enter();
let evt: Option<VCSEvent> = match evt {
GerritStreamEvent::DroppedOutput => {
tracing::warn!("Gerrit reported loss of output!");
// TODO: Update the stat worker about this.
None
}
GerritStreamEvent::ChangeDeleted { change, .. }
| GerritStreamEvent::ChangeAbandoned { change, .. } => Some(VCSEvent::ChangeDeleted {
change: crate::message::Change {
target_branch: Some(change.branch.clone()),
number: change
.change_number
.expect("Change deletion payload should contain a CL number"),
head_sha: change.current_patch_set.as_ref().unwrap().r#ref.clone(),
},
repo: crate::message::Repo {
owner: "no one really".to_string(),
name: "nixpkgs".to_string(),
full_name: "what is even this field".to_owned(),
clone_url: "somewhere on the internet; why do you care?".to_owned(),
},
}),
GerritStreamEvent::ChangeRestored { change, .. }
| GerritStreamEvent::PatchSetCreated { change, .. } => Some(VCSEvent::ChangeCreated {
change: crate::message::Change {
target_branch: Some(change.branch.clone()),
number: change
.change_number
.expect("Patch set creation payload should contain a CL number"),
head_sha: change
.current_patch_set
.as_ref()
.expect("Patch set creation payload should contain a current patch set")
.r#ref
.clone(),
},
repo: crate::message::Repo {
owner: "no one really".to_string(),
name: "nixpkgs".to_string(),
full_name: "what is even this field".to_owned(),
clone_url: "somewhere on the internet; why do you care?".to_owned(),
},
}),
GerritStreamEvent::WorkInProgressStateChanged {
change, patch_set, ..
} => Some(VCSEvent::WIPStatusChange {
change: crate::message::Change {
target_branch: Some(change.branch.clone()),
number: change.change_number.unwrap(),
head_sha: patch_set.r#ref.clone(),
},
wip: change.wip.unwrap(),
}),
// TODO: CommentAdded / HashtagsChanged / TopicChanged
// propagate them for chatops support and tagging manipulations
// TODO: ChangeMerged
// dispatch something to support periodic jobs or further workflows after a commit is
// merged, e.g. performance CI, etc.
_ => None,
};
if let Some(evt) = evt {
vec![
worker::publish_serde_action(&None, &None, &evt),
worker::Action::Ack,
]
} else {
vec![worker::Action::Ack]
}
}
}

View file

@ -2,6 +2,7 @@ pub mod build;
pub mod eval;
pub mod evaluate;
pub mod evaluationfilter;
pub mod gerrit;
pub mod log_message_collector;
pub mod pastebin_collector;
pub mod statscollector;