Compare commits
5 commits
3af7e6976b
...
bb08c8cb97
Author | SHA1 | Date | |
---|---|---|---|
bb08c8cb97 | |||
6a628c3cdd | |||
2b9a366b24 | |||
c8605f7429 | |||
69f6ae8cc2 |
8 changed files with 234 additions and 101 deletions
|
@ -51,17 +51,18 @@ async fn main() -> Result<(), Box<dyn Error>> {
|
|||
)
|
||||
.await;
|
||||
|
||||
let routing_key = "abc";
|
||||
// TODO: add the specific instance name in multi-active contexts.
|
||||
let routing_key = "gerrit.events";
|
||||
|
||||
let event_stream = gerrit_api.stream_events().await.unwrap();
|
||||
pin_mut!(event_stream);
|
||||
|
||||
loop {
|
||||
let raw_evt = event_stream.next().await;
|
||||
tracing::debug!("{:?}", raw_evt);
|
||||
|
||||
match raw_evt {
|
||||
Some(Ok(event)) => {
|
||||
println!("{:#?}", event);
|
||||
let queue_message =
|
||||
prepare_queue_message(Some(exchange_name), Some(routing_key), &event);
|
||||
let props = BasicProperties::default()
|
||||
|
@ -86,11 +87,11 @@ async fn main() -> Result<(), Box<dyn Error>> {
|
|||
}
|
||||
}
|
||||
}
|
||||
Some(Err(_err)) => {
|
||||
// notify the event
|
||||
Some(Err(err)) => {
|
||||
tracing::error!("Failed to read a new Gerrit event: {}", err);
|
||||
}
|
||||
None => {
|
||||
// notify the event
|
||||
tracing::warn!("No event in the Gerrit stream, retrying...");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
86
ofborg/src/bin/gerrit-generic-vcs-filter.rs
Normal file
86
ofborg/src/bin/gerrit-generic-vcs-filter.rs
Normal file
|
@ -0,0 +1,86 @@
|
|||
/// This converts a Gerrit event to a OfBorg's VCS general event.
|
||||
use std::env;
|
||||
use std::error::Error;
|
||||
|
||||
use ofborg::easyamqp::{self, ChannelExt, ConsumerExt};
|
||||
use ofborg::easylapin;
|
||||
use ofborg::tasks;
|
||||
|
||||
use ofborg::config;
|
||||
|
||||
#[tokio::main]
|
||||
async fn main() -> Result<(), Box<dyn Error>> {
|
||||
ofborg::setup_log();
|
||||
|
||||
let arg = env::args()
|
||||
.nth(1)
|
||||
.expect("usage: gerrit-generic-vcs-filter <config>");
|
||||
let cfg = config::load(arg.as_ref());
|
||||
|
||||
let conn = easylapin::from_config(&cfg.rabbitmq).await?;
|
||||
let mut chan = conn.create_channel().await?;
|
||||
|
||||
// Destination exchange: `vcs-events`
|
||||
chan.declare_exchange(easyamqp::ExchangeConfig {
|
||||
exchange: "vcs-events".to_owned(),
|
||||
exchange_type: easyamqp::ExchangeType::Topic,
|
||||
passive: false,
|
||||
durable: true,
|
||||
auto_delete: false,
|
||||
no_wait: false,
|
||||
internal: false,
|
||||
})
|
||||
.await?;
|
||||
|
||||
// Source exchange: `gerrit-events`
|
||||
chan.declare_exchange(easyamqp::ExchangeConfig {
|
||||
exchange: "gerrit-events".to_owned(),
|
||||
exchange_type: easyamqp::ExchangeType::Topic,
|
||||
passive: false,
|
||||
durable: true,
|
||||
auto_delete: false,
|
||||
no_wait: false,
|
||||
internal: false,
|
||||
})
|
||||
.await?;
|
||||
|
||||
// Source queue: `gerrit-generic-filter`
|
||||
let queue_name = String::from("gerrit-generic-filter");
|
||||
chan.declare_queue(easyamqp::QueueConfig {
|
||||
queue: queue_name.clone(),
|
||||
passive: false,
|
||||
durable: true,
|
||||
exclusive: false,
|
||||
auto_delete: false,
|
||||
no_wait: false,
|
||||
})
|
||||
.await?;
|
||||
|
||||
chan.bind_queue(easyamqp::BindQueueConfig {
|
||||
queue: queue_name.clone(),
|
||||
exchange: "gerrit-events".to_owned(),
|
||||
routing_key: Some("gerrit.events.*".to_owned()),
|
||||
no_wait: false,
|
||||
})
|
||||
.await?;
|
||||
|
||||
tracing::info!("Fetching jobs from '{}'", &queue_name);
|
||||
easylapin::WorkerChannel(chan)
|
||||
.consume(
|
||||
tasks::gerrit::GerritEventFilterWorker::new(),
|
||||
easyamqp::ConsumeConfig {
|
||||
queue: queue_name.clone(),
|
||||
consumer_tag: format!("{}-gerrit-generic-vcs-filter", cfg.whoami()),
|
||||
no_local: false,
|
||||
no_ack: false,
|
||||
no_wait: false,
|
||||
exclusive: false,
|
||||
},
|
||||
)
|
||||
.await?;
|
||||
|
||||
drop(conn);
|
||||
tracing::info!("Closed the session... EOF");
|
||||
|
||||
Ok(())
|
||||
}
|
|
@ -1,37 +0,0 @@
|
|||
/// This is a Gerrit listener for events which puts them on stdout for debugging purposes.
|
||||
/// The list of event type listened to is static.
|
||||
use std::env;
|
||||
use std::error::Error;
|
||||
|
||||
use futures::{pin_mut, StreamExt};
|
||||
use ofborg::vcs::gerrit::ssh::GerritSSHApi;
|
||||
use tracing::info;
|
||||
|
||||
use ofborg::config;
|
||||
|
||||
#[tokio::main]
|
||||
async fn main() -> Result<(), Box<dyn Error>> {
|
||||
ofborg::setup_log();
|
||||
|
||||
let arg = env::args()
|
||||
.nth(1)
|
||||
.expect("usage: listen-gerrit-events <config>");
|
||||
let cfg = config::load(arg.as_ref());
|
||||
|
||||
let gerrit_cfg = cfg
|
||||
.gerrit
|
||||
.expect("Gerrit event streaming requires a Gerrit configuration");
|
||||
let gerrit_ssh_uri = format!("ssh://{}:{}", gerrit_cfg.instance_uri, gerrit_cfg.ssh_port);
|
||||
info!("Listening events from Gerrit on {}", gerrit_ssh_uri);
|
||||
let mut gerrit_api = GerritSSHApi::new(gerrit_cfg.ssh_private_key_file, &gerrit_ssh_uri).await;
|
||||
|
||||
let event_stream = gerrit_api.stream_events().await.unwrap();
|
||||
pin_mut!(event_stream);
|
||||
loop {
|
||||
let thing = event_stream.next().await;
|
||||
println!("{:?}", thing);
|
||||
if let Some(Ok(event)) = thing {
|
||||
println!("{:#?}", event);
|
||||
}
|
||||
}
|
||||
}
|
|
@ -3,5 +3,6 @@ pub mod buildlogmsg;
|
|||
pub mod buildresult;
|
||||
mod common;
|
||||
pub mod evaluationjob;
|
||||
pub mod vcs;
|
||||
|
||||
pub use self::common::{Change, Repo};
|
||||
|
|
12
ofborg/src/message/vcs.rs
Normal file
12
ofborg/src/message/vcs.rs
Normal file
|
@ -0,0 +1,12 @@
|
|||
/// VCS generic events
|
||||
use serde::{Deserialize, Serialize};
|
||||
|
||||
use super::{Change, Repo};
|
||||
|
||||
#[derive(Serialize, Deserialize, Debug)]
|
||||
pub enum VCSEvent {
|
||||
ChangeCreated { change: Change, repo: Repo },
|
||||
ChangeDeleted { change: Change, repo: Repo },
|
||||
ChangeUpdated { change: Change, repo: Repo },
|
||||
WIPStatusChange { change: Change, wip: bool },
|
||||
}
|
|
@ -1,6 +1,6 @@
|
|||
use crate::acl;
|
||||
use crate::ghevent;
|
||||
use crate::message::{evaluationjob, Change, Repo};
|
||||
use crate::message::evaluationjob;
|
||||
use crate::message::vcs::VCSEvent;
|
||||
use crate::worker;
|
||||
|
||||
use async_trait::async_trait;
|
||||
|
@ -19,7 +19,7 @@ impl EvaluationFilterWorker {
|
|||
|
||||
#[async_trait]
|
||||
impl worker::SimpleWorker for EvaluationFilterWorker {
|
||||
type J = ghevent::PullRequestEvent;
|
||||
type J = VCSEvent;
|
||||
|
||||
async fn msg_to_job(
|
||||
&mut self,
|
||||
|
@ -37,70 +37,24 @@ impl worker::SimpleWorker for EvaluationFilterWorker {
|
|||
}
|
||||
}
|
||||
|
||||
async fn consumer(
|
||||
&mut self,
|
||||
_chan: &mut lapin::Channel,
|
||||
job: &ghevent::PullRequestEvent,
|
||||
) -> worker::Actions {
|
||||
let span = debug_span!("job", pr = ?job.number);
|
||||
async fn consumer(&mut self, _chan: &mut lapin::Channel, job: &VCSEvent) -> worker::Actions {
|
||||
let span = debug_span!("event", event = ?job);
|
||||
let _enter = span.enter();
|
||||
|
||||
if !self.acl.is_repo_eligible(&job.repository.full_name) {
|
||||
info!("Repo not authorized ({})", job.repository.full_name);
|
||||
return vec![worker::Action::Ack];
|
||||
}
|
||||
|
||||
if job.pull_request.state != ghevent::PullRequestState::Open {
|
||||
info!(
|
||||
"PR is not open ({}#{})",
|
||||
job.repository.full_name, job.number
|
||||
);
|
||||
return vec![worker::Action::Ack];
|
||||
}
|
||||
|
||||
let interesting: bool = match job.action {
|
||||
ghevent::PullRequestAction::Opened
|
||||
| ghevent::PullRequestAction::Synchronize
|
||||
| ghevent::PullRequestAction::Reopened => true,
|
||||
ghevent::PullRequestAction::Edited => {
|
||||
if let Some(ref changes) = job.changes {
|
||||
changes.base.is_some()
|
||||
} else {
|
||||
false
|
||||
}
|
||||
}
|
||||
ghevent::PullRequestAction::Unknown => false,
|
||||
let (change, repo) = match job {
|
||||
VCSEvent::ChangeCreated { change, repo } => (change, repo),
|
||||
VCSEvent::ChangeUpdated { change, repo } => (change, repo),
|
||||
_ => return vec![worker::Action::Ack],
|
||||
};
|
||||
|
||||
if !interesting {
|
||||
info!(
|
||||
"Not interesting: {}#{} because of {:?}",
|
||||
job.repository.full_name, job.number, job.action
|
||||
);
|
||||
|
||||
if !self.acl.is_repo_eligible(&repo.full_name) {
|
||||
info!("Repo not authorized ({})", &repo.full_name);
|
||||
return vec![worker::Action::Ack];
|
||||
}
|
||||
|
||||
info!(
|
||||
"Found {}#{} to be interesting because of {:?}",
|
||||
job.repository.full_name, job.number, job.action
|
||||
);
|
||||
let repo_msg = Repo {
|
||||
clone_url: job.repository.clone_url.clone(),
|
||||
full_name: job.repository.full_name.clone(),
|
||||
owner: job.repository.owner.login.clone(),
|
||||
name: job.repository.name.clone(),
|
||||
};
|
||||
|
||||
let change_msg = Change {
|
||||
number: job.number,
|
||||
head_sha: job.pull_request.head.sha.clone(),
|
||||
target_branch: Some(job.pull_request.base.git_ref.clone()),
|
||||
};
|
||||
|
||||
let msg = evaluationjob::EvaluationJob {
|
||||
repo: repo_msg,
|
||||
change: change_msg,
|
||||
repo: repo.clone(),
|
||||
change: change.clone(),
|
||||
};
|
||||
|
||||
vec![
|
||||
|
|
115
ofborg/src/tasks/gerrit/mod.rs
Normal file
115
ofborg/src/tasks/gerrit/mod.rs
Normal file
|
@ -0,0 +1,115 @@
|
|||
/// This transforms a Gerrit event into a OfBorg event.
|
||||
use crate::message::vcs::VCSEvent;
|
||||
use crate::vcs::gerrit::data_structures::GerritStreamEvent;
|
||||
use crate::worker;
|
||||
use async_trait::async_trait;
|
||||
use tracing::debug_span;
|
||||
|
||||
pub struct GerritEventFilterWorker;
|
||||
|
||||
impl GerritEventFilterWorker {
|
||||
pub fn new() -> Self {
|
||||
Self {}
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl worker::SimpleWorker for GerritEventFilterWorker {
|
||||
type J = GerritStreamEvent;
|
||||
|
||||
async fn msg_to_job(
|
||||
&mut self,
|
||||
_: &str,
|
||||
_: &Option<String>,
|
||||
body: &[u8],
|
||||
) -> Result<Self::J, String> {
|
||||
match serde_json::from_slice(body) {
|
||||
Ok(e) => Ok(e),
|
||||
Err(e) => Err(format!(
|
||||
"Failed to deserialize event {:?}: {:?}",
|
||||
e,
|
||||
String::from_utf8(body.to_vec())
|
||||
)),
|
||||
}
|
||||
}
|
||||
|
||||
async fn consumer(
|
||||
&mut self,
|
||||
_chan: &mut lapin::Channel,
|
||||
evt: &GerritStreamEvent,
|
||||
) -> worker::Actions {
|
||||
let span = debug_span!("event");
|
||||
let _enter = span.enter();
|
||||
|
||||
let evt: Option<VCSEvent> = match evt {
|
||||
GerritStreamEvent::DroppedOutput => {
|
||||
tracing::warn!("Gerrit reported loss of output!");
|
||||
// TODO: Update the stat worker about this.
|
||||
|
||||
None
|
||||
}
|
||||
GerritStreamEvent::ChangeDeleted { change, .. }
|
||||
| GerritStreamEvent::ChangeAbandoned { change, .. } => Some(VCSEvent::ChangeDeleted {
|
||||
change: crate::message::Change {
|
||||
target_branch: Some(change.branch.clone()),
|
||||
number: change
|
||||
.change_number
|
||||
.expect("Change deletion payload should contain a CL number"),
|
||||
head_sha: change.current_patch_set.as_ref().unwrap().r#ref.clone(),
|
||||
},
|
||||
repo: crate::message::Repo {
|
||||
owner: "no one really".to_string(),
|
||||
name: "nixpkgs".to_string(),
|
||||
full_name: "what is even this field".to_owned(),
|
||||
clone_url: "somewhere on the internet; why do you care?".to_owned(),
|
||||
},
|
||||
}),
|
||||
GerritStreamEvent::ChangeRestored { change, .. }
|
||||
| GerritStreamEvent::PatchSetCreated { change, .. } => Some(VCSEvent::ChangeCreated {
|
||||
change: crate::message::Change {
|
||||
target_branch: Some(change.branch.clone()),
|
||||
number: change
|
||||
.change_number
|
||||
.expect("Patch set creation payload should contain a CL number"),
|
||||
head_sha: change
|
||||
.current_patch_set
|
||||
.as_ref()
|
||||
.expect("Patch set creation payload should contain a current patch set")
|
||||
.r#ref
|
||||
.clone(),
|
||||
},
|
||||
repo: crate::message::Repo {
|
||||
owner: "no one really".to_string(),
|
||||
name: "nixpkgs".to_string(),
|
||||
full_name: "what is even this field".to_owned(),
|
||||
clone_url: "somewhere on the internet; why do you care?".to_owned(),
|
||||
},
|
||||
}),
|
||||
GerritStreamEvent::WorkInProgressStateChanged {
|
||||
change, patch_set, ..
|
||||
} => Some(VCSEvent::WIPStatusChange {
|
||||
change: crate::message::Change {
|
||||
target_branch: Some(change.branch.clone()),
|
||||
number: change.change_number.unwrap(),
|
||||
head_sha: patch_set.r#ref.clone(),
|
||||
},
|
||||
wip: change.wip.unwrap(),
|
||||
}),
|
||||
// TODO: CommentAdded / HashtagsChanged / TopicChanged
|
||||
// propagate them for chatops support and tagging manipulations
|
||||
// TODO: ChangeMerged
|
||||
// dispatch something to support periodic jobs or further workflows after a commit is
|
||||
// merged, e.g. performance CI, etc.
|
||||
_ => None,
|
||||
};
|
||||
|
||||
if let Some(evt) = evt {
|
||||
vec![
|
||||
worker::publish_serde_action(&None, &None, &evt),
|
||||
worker::Action::Ack,
|
||||
]
|
||||
} else {
|
||||
vec![worker::Action::Ack]
|
||||
}
|
||||
}
|
||||
}
|
|
@ -2,6 +2,7 @@ pub mod build;
|
|||
pub mod eval;
|
||||
pub mod evaluate;
|
||||
pub mod evaluationfilter;
|
||||
pub mod gerrit;
|
||||
pub mod log_message_collector;
|
||||
pub mod pastebin_collector;
|
||||
pub mod statscollector;
|
||||
|
|
Loading…
Reference in a new issue