Compare commits

...

2 commits

Author SHA1 Message Date
ecdd6d52a6 feat: statcheck worker
Signed-off-by: Raito Bezarius <masterancpp@gmail.com>
2024-11-14 18:38:16 +01:00
b79a48bd73 feat: Gerrit HTTP API and partial VCS implementation
This implements the Gerrit surface of the VCS API async API via HTTP.

TODO:

- Event streamer should go somewhere else
- We need to replace the missing features

There's some impedence mismatch on IDs but this can be solved by harder
refactors.

Signed-off-by: Raito Bezarius <masterancpp@gmail.com>
2024-11-14 18:38:14 +01:00
10 changed files with 485 additions and 103 deletions

4
Cargo.lock generated
View file

@ -1114,9 +1114,9 @@ dependencies = [
[[package]]
name = "hashbrown"
version = "0.15.0"
version = "0.15.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1e087f84d4f86bf4b218b927129862374b72199ae7d8657835f1e89000eea4fb"
checksum = "3a9bfc1af68b1726ea47d3d5109de126281def866b33970e10fbab11b5dafab3"
[[package]]
name = "heck"

View file

@ -2,8 +2,11 @@ use std::env;
use std::error::Error;
use std::path::Path;
use std::process;
use std::sync::RwLock;
use async_std::task;
use ofborg::config::VCSConfig;
use ofborg::tasks::evaluate::SupportedVCS;
use tracing::{error, info};
use ofborg::checkout;
@ -51,11 +54,16 @@ fn main() -> Result<(), Box<dyn Error>> {
no_wait: false,
})?;
let vcs_data = match cfg.vcs {
VCSConfig::GitHub => SupportedVCS::GitHub(RwLock::new(cfg.github_app_vendingmachine())),
VCSConfig::Gerrit => SupportedVCS::Gerrit,
};
let handle = easylapin::WorkerChannel(chan).consume(
tasks::evaluate::EvaluationWorker::new(
cloner,
&nix,
cfg.github_app_vendingmachine(),
vcs_data,
cfg.acl(),
cfg.runner.identity.clone(),
events,

View file

@ -0,0 +1,79 @@
/// Statuses and checks worker
/// - will keep a database of changes
/// - their statuses
/// - their checks
/// - is VCS/CI agnostic
use std::env;
use std::error::Error;
use async_std::task;
use ofborg::config;
use ofborg::easyamqp;
use ofborg::easyamqp::ChannelExt;
use ofborg::easyamqp::ConsumerExt;
use ofborg::easylapin;
use ofborg::tasks;
use tracing::info;
fn main() -> Result<(), Box<dyn Error>> {
ofborg::setup_log();
let arg = env::args()
.nth(1)
.expect("usage: statcheck-worker <config>");
let cfg = config::load(arg.as_ref());
let conn = easylapin::from_config(&cfg.rabbitmq)?;
let mut chan = task::block_on(conn.create_channel())?;
// an RPC queue for verbs
let api_queue_name = "statcheck-api".to_owned();
// an event queue to be notified about statuses & checks changes.
let event_queue_name = "statcheck-events".to_owned();
chan.declare_exchange(easyamqp::ExchangeConfig {
exchange: api_queue_name.clone(),
exchange_type: easyamqp::ExchangeType::Topic,
passive: false,
durable: true,
auto_delete: false,
no_wait: false,
internal: false,
})?;
chan.declare_queue(easyamqp::QueueConfig {
queue: api_queue_name.clone(),
passive: false,
durable: true,
exclusive: false,
auto_delete: false,
no_wait: false,
})?;
chan.bind_queue(easyamqp::BindQueueConfig {
queue: api_queue_name.clone(),
exchange: api_queue_name.clone(),
routing_key: None,
no_wait: false,
})?;
let handle = easylapin::WorkerChannel(chan).consume(
tasks::statcheck_collector::StatusCheckCollector::new(cfg.statcheck.clone().db),
easyamqp::ConsumeConfig {
queue: api_queue_name.clone(),
consumer_tag: format!("{}-{}", cfg.whoami(), api_queue_name),
no_local: false,
no_ack: false,
no_wait: false,
exclusive: false,
},
)?;
info!("Waiting for API calls on {}", api_queue_name);
info!("Notifying of new changes on {}", event_queue_name);
task::block_on(handle);
drop(conn); // Close connection.
info!("Closed the session... EOF");
Ok(())
}

View file

@ -12,6 +12,12 @@ use hubcaps::{Credentials, Github, InstallationTokenGenerator, JWTCredentials};
use serde::de::{self, Deserialize, Deserializer};
use tracing::{debug, error, info, warn};
#[derive(Serialize, Deserialize, Debug)]
pub enum VCSConfig {
GitHub,
Gerrit,
}
#[derive(Serialize, Deserialize, Debug)]
pub struct Config {
pub runner: RunnerConfig,
@ -19,10 +25,16 @@ pub struct Config {
pub checkout: CheckoutConfig,
pub nix: NixConfig,
pub rabbitmq: RabbitMqConfig,
pub github: Option<GithubConfig>,
pub github_app: Option<GithubAppConfig>,
pub vcs: VCSConfig,
pub pastebin: PastebinConfig,
pub log_storage: Option<LogStorage>,
// GitHub-specific configuration if vcs == GitHub.
pub github: Option<GithubConfig>,
pub github_app: Option<GithubAppConfig>,
// Gerrit-specific configuration if vcs == Gerrit.
pub gerrit: Option<GerritConfig>,
}
#[derive(Serialize, Deserialize, Debug)]
@ -58,6 +70,21 @@ pub struct NixConfig {
pub initial_heap_size: Option<String>,
}
const fn default_gerrit_ssh_port() -> u16 {
29418
}
#[derive(Serialize, Deserialize, Debug, Clone)]
pub struct GerritConfig {
// For HTTP API.
pub access_token_file: PathBuf,
// For event streaming.
pub ssh_private_key_file: PathBuf,
pub instance_uri: String,
#[serde(default = "default_gerrit_ssh_port")]
pub ssh_port: u16,
}
#[derive(Serialize, Deserialize, Debug, Clone)]
pub struct GithubConfig {
pub token_file: PathBuf,

View file

@ -11,6 +11,7 @@ use crate::tasks::eval;
use crate::utils::pastebin::PersistedPastebin;
use crate::vcs::commit_status::{CommitStatus, CommitStatusError};
use crate::vcs::generic::{Issue, IssueState, State, VersionControlSystemAPI};
use crate::vcs::gerrit::http::GerritHTTPApi;
use crate::vcs::github::compat::GitHubAPI;
use crate::worker;
@ -21,10 +22,15 @@ use std::time::Instant;
use tracing::{debug_span, error, info, warn};
pub enum SupportedVCS {
GitHub(RwLock<GithubAppVendingMachine>),
Gerrit,
}
pub struct EvaluationWorker<E> {
cloner: checkout::CachedCloner,
nix: nix::Nix,
github_vend: RwLock<GithubAppVendingMachine>,
vcs: SupportedVCS,
acl: Acl,
identity: String,
events: E,
@ -35,7 +41,7 @@ impl<E: stats::SysEvents> EvaluationWorker<E> {
pub fn new(
cloner: checkout::CachedCloner,
nix: &nix::Nix,
github_vend: GithubAppVendingMachine,
vcs: SupportedVCS,
acl: Acl,
identity: String,
events: E,
@ -43,7 +49,7 @@ impl<E: stats::SysEvents> EvaluationWorker<E> {
EvaluationWorker {
cloner,
nix: nix.without_limited_supported_systems(),
github_vend: RwLock::new(github_vend),
vcs,
acl,
identity,
events,
@ -81,20 +87,21 @@ impl<E: stats::SysEvents + 'static> worker::SimpleWorker for EvaluationWorker<E>
let span = debug_span!("job", change_id = ?job.change.number);
let _enter = span.enter();
// TODO: introduce dynamic dispatcher instantiation here for the VCS API.
let mut vending_machine = self
.github_vend
let vcs_api: Rc<dyn VersionControlSystemAPI> = match self.vcs {
SupportedVCS::GitHub(ref vending_machine) => {
let mut vending_machine = vending_machine
.write()
.expect("Failed to get write lock on github vending machine");
let github_client = vending_machine
.for_repo(&job.repo.owner, &job.repo.name)
.expect("Failed to get a github client token");
let github_api = Rc::new(GitHubAPI::new(github_client.clone()));
Rc::new(GitHubAPI::new(github_client.clone()))
}
SupportedVCS::Gerrit => Rc::new(GerritHTTPApi),
};
OneEval::new(
github_api,
vcs_api,
&self.nix,
&self.acl,
&mut self.events,

View file

@ -0,0 +1,46 @@
use std::path::PathBuf;
use tracing::{debug_span, error};
use crate::worker;
#[derive(Serialize, Deserialize, Debug)]
enum StatusCheckRPCMessage {
ListStatuses,
ListChecks,
}
#[allow(dead_code)]
struct StatusCheckCollector {
db_path: PathBuf,
}
// RPC API worker
impl worker::SimpleWorker for StatusCheckCollector {
type J = StatusCheckRPCMessage;
fn msg_to_job(
&mut self,
_method: &str,
_headers: &Option<String>,
body: &[u8],
) -> Result<Self::J, String> {
match serde_json::from_slice(body) {
Ok(e) => Ok(e),
Err(e) => {
error!(
"Failed to deserialize StatusCheckRPCMessage: {:?}",
String::from_utf8(body.to_vec())
);
panic!("{:?}", e);
}
}
}
fn consumer(&mut self, job: &Self::J) -> worker::Actions {
let span = debug_span!("command");
let _enter = span.enter();
vec![worker::Action::Ack]
}
}

View file

@ -9,6 +9,21 @@ pub struct Account {
pub username: Option<String>, // User's username, if configured
}
impl From<Vec<Account>> for crate::vcs::generic::ChangeReviewers {
fn from(value: Vec<Account>) -> Self {
// FIXME: I don't think Gerrit remembers when we added a group instead of entities.
crate::vcs::generic::ChangeReviewers {
entity_reviewers: value
.into_iter()
// FIXME: this is a… quite the assumption, let's relax it by having at _least_ one
// identity identifier.
.map(|a| a.username.expect("Expected username"))
.collect(),
team_reviewers: vec![],
}
}
}
#[derive(Serialize, Deserialize, Debug)]
pub struct Approval {
pub r#type: String, // Internal name of the approval
@ -191,6 +206,18 @@ pub struct Change {
pub all_reviewers: Vec<Account>, // List of all reviewers
}
impl From<Change> for crate::message::Change {
fn from(value: Change) -> Self {
Self {
target_branch: Some(value.branch),
// While the change number is deprecated, we actually need it.
// FIXME: enforce type level checking of this.
number: value.change_number.unwrap(),
head_sha: value.current_patch_set.revision,
}
}
}
#[derive(Serialize, Deserialize, Debug)]
pub struct RefUpdate {
#[serde(rename = "oldRev")]
@ -209,25 +236,25 @@ pub enum GerritStreamEvent {
patch_set: PatchSet,
abandoner: Account,
reason: String,
event_created_on: u64
event_created_on: u64,
},
ChangeDeleted {
change: Change,
deleter: Account
deleter: Account,
},
ChangeMerged {
change: Change,
patch_set: PatchSet,
submitter: Account,
new_rev: String,
event_created_on: u64
event_created_on: u64,
},
ChangeRestored {
change: Change,
patch_set: PatchSet,
restorer: Account,
reason: String,
event_created_on: u64
event_created_on: u64,
},
CommentAdded {
change: Change,
@ -235,7 +262,7 @@ pub enum GerritStreamEvent {
author: Account,
approvals: Vec<Approval>,
comment: String,
event_created_on: u64
event_created_on: u64,
},
DroppedOutput,
HashtagsChanged {
@ -249,30 +276,30 @@ pub enum GerritStreamEvent {
ProjectCreated {
project_name: String,
project_head: String,
event_created_on: u64
event_created_on: u64,
},
PatchSetCreated {
change: Change,
patch_set: PatchSet,
uploader: Account,
event_created_on: u64
event_created_on: u64,
},
RefUpdated {
submitter: Account,
ref_update: RefUpdate,
event_created_on: u64
event_created_on: u64,
},
BatchRefUpdated {
submitter: Account,
ref_updates: Vec<RefUpdate>,
event_created_on: u64
event_created_on: u64,
},
ReviewerAdded {
change: Change,
patch_set: PatchSet,
reviewer: Account,
adder: Account,
event_created_on: u64
event_created_on: u64,
},
ReviewerDeleted {
change: Change,
@ -288,19 +315,19 @@ pub enum GerritStreamEvent {
old_topic: Option<String>,
new_topic: Option<String>,
changer: Account,
event_created_on: u64
event_created_on: u64,
},
WorkInProgressStateChanged {
change: Change,
patch_set: PatchSet,
changer: Account,
event_created_on: u64
event_created_on: u64,
},
PrivateStateChanged {
change: Change,
patch_set: PatchSet,
changer: Account,
event_created_on: u64
event_created_on: u64,
},
VoteDeleted {
change: Change,
@ -313,6 +340,6 @@ pub enum GerritStreamEvent {
ProjectHeadUpdate {
old_head: String,
new_head: String,
event_created_on: u64
}
event_created_on: u64,
},
}

View file

@ -0,0 +1,49 @@
//! REST API bindings for Gerrit
//! TODO:
//! - trace IDs support
//! - label support
use super::data_structures::{Account, Change};
pub struct GerritHTTPApi;
impl GerritHTTPApi {
// async fn get_project(&self, project_name: &str) -> Project {}
/// Fetches all changes according to the query and the given limit.
/// This will default to 60 changes by default.
pub(crate) async fn list_changes(&self, _query: &str, _limit: Option<u64>) -> Vec<Change> {
Default::default()
}
/// Fetch the latest change ID for a given project and CL number.
pub(crate) async fn get_change_id(&self, _project_name: &str, _cl_number: u64) -> String {
"".to_owned()
}
/// Fetch a given change according to the change ID (not the CL number).
pub(crate) async fn get_change(&self, _change_id: &str) -> Option<Change> {
Default::default()
}
/// Set additional and remove certain hashtags for a given change ID (not the CL number).
pub(crate) async fn set_hashtags(
&self,
_change_id: &str,
_add: &[String],
_remove: &[String],
) -> Vec<String> {
Default::default()
}
/// List all reviewers on a given change ID (not the CL number).
pub(crate) async fn list_reviewers(&self, _change_id: &str) -> Vec<Account> {
Default::default()
}
/// Set reviewers and a message on a given change ID (not the CL number).
pub(crate) async fn set_reviewers(
&self,
_change_id: &str,
_message: &str,
_reviewers: Vec<Account>,
) {
}
}

View file

@ -0,0 +1,137 @@
//! Implementation of the VCS API for Gerrit
//! This uses the HTTP API.
use futures_util::FutureExt;
use crate::vcs::generic::VersionControlSystemAPI;
use super::{data_structures::Account, http::GerritHTTPApi};
impl VersionControlSystemAPI for GerritHTTPApi {
// The next three APIs are todo!() because they cannot be implemented in Gerrit.
// Gerrit does not offer any way to get this information out.
// GerritHTTPApi needs to return something like Unsupported
// and we need to compose a GerritHTTPApi with a GerritForge which contains an implementation
// of check statuses and commit statuses and an issue tracker.
fn create_check_statuses(
&self,
_repo: &crate::message::Repo,
_checks: Vec<crate::vcs::generic::CheckRunOptions>,
) -> futures_util::future::BoxFuture<()> {
todo!();
}
fn create_commit_statuses(
&self,
_repo: &crate::message::Repo,
_sha: String,
_state: crate::vcs::generic::State,
_context: String,
_description: String,
_target_url: String,
) -> futures_util::future::BoxFuture<Result<(), crate::vcs::commit_status::CommitStatusError>>
{
todo!();
}
fn get_issue(
&self,
_repo: &crate::message::Repo,
_number: u64,
) -> futures_util::future::BoxFuture<Result<crate::vcs::generic::Issue, String>> {
todo!();
}
fn get_repository(&self, _repo: &crate::message::Repo) -> crate::vcs::generic::Repository {
todo!();
}
fn get_changes(
&self,
repo: &crate::message::Repo,
) -> futures_util::future::BoxFuture<Vec<crate::message::Change>> {
let repo_name = repo.name.to_owned();
async move {
self.list_changes(&format!("project:{}", &repo_name), None)
.await
.into_iter()
.map(|c| c.into())
.collect()
}
.boxed()
}
fn get_change(
&self,
repo: &crate::message::Repo,
number: u64,
) -> futures_util::future::BoxFuture<Option<crate::message::Change>> {
let repo_name = repo.name.to_owned();
async move {
let change_id = self.get_change_id(&repo_name, number).await;
GerritHTTPApi::get_change(&self, &change_id)
.await
.map(|c| c.into())
}
.boxed()
}
fn update_labels(
&self,
repo: &crate::message::Repo,
number: u64,
add: &[String],
remove: &[String],
) -> futures_util::future::BoxFuture<()> {
let add = add.to_owned();
let remove = remove.to_owned();
let repo_name = repo.name.to_owned();
async move {
let change_id = self.get_change_id(&repo_name, number).await;
self.set_hashtags(&change_id, &add, &remove).await;
}
.boxed()
}
fn get_existing_reviewers(
&self,
repo: &crate::message::Repo,
number: u64,
) -> futures_util::future::BoxFuture<crate::vcs::generic::ChangeReviewers> {
let repo_name = repo.name.to_owned();
async move {
let change_id = self.get_change_id(&repo_name, number).await;
self.list_reviewers(&change_id).await.into()
}
.boxed()
}
fn request_reviewers(
&self,
repo: &crate::message::Repo,
number: u64,
entity_reviewers: Vec<String>,
// FIXME: support group reviews
_team_reviewers: Vec<String>,
) -> futures_util::future::BoxFuture<()> {
let repo_name = repo.name.to_owned();
async move {
let change_id = self.get_change_id(&repo_name, number).await;
self.set_reviewers(
&change_id,
"Automatic reviewer request",
entity_reviewers
.into_iter()
.map(|reviewer| Account {
username: Some(reviewer),
email: None,
name: None,
})
.collect(),
)
.await
}
.boxed()
}
}

View file

@ -1,3 +1,5 @@
pub mod checks;
pub mod data_structures;
pub mod http;
pub mod r#impl;
// pub mod events;