feat: add a debug Gerrit event listener

This should be piped into `ofborgctl listen gerrit-events` or something
later on.

Signed-off-by: Raito Bezarius <masterancpp@gmail.com>
This commit is contained in:
raito 2024-11-14 22:19:21 +01:00
parent ba4b005719
commit 42546e5bc7
6 changed files with 197 additions and 39 deletions

52
Cargo.lock generated
View file

@ -384,6 +384,28 @@ dependencies = [
"wasm-bindgen-futures", "wasm-bindgen-futures",
] ]
[[package]]
name = "async-stream"
version = "0.3.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0b5a71a6f37880a80d1d7f19efd781e4b5de42c88f0722cc13bcb6cc2cfe8476"
dependencies = [
"async-stream-impl",
"futures-core",
"pin-project-lite",
]
[[package]]
name = "async-stream-impl"
version = "0.3.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c7c24de15d275a1ecfd47a380fb4d5ec9bfe0933f309ed5e705b775596a3574d"
dependencies = [
"proc-macro2",
"quote",
"syn",
]
[[package]] [[package]]
name = "async-task" name = "async-task"
version = "4.7.1" version = "4.7.1"
@ -960,6 +982,7 @@ checksum = "65bc07b1a8bc7c85c5f2e110c476c7389b4554ba72af57d8445ea63a576b0876"
dependencies = [ dependencies = [
"futures-channel", "futures-channel",
"futures-core", "futures-core",
"futures-executor",
"futures-io", "futures-io",
"futures-sink", "futures-sink",
"futures-task", "futures-task",
@ -982,6 +1005,17 @@ version = "0.3.31"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "05f29059c0c2090612e8d742178b0580d2dc940c837851ad723096f87af6663e" checksum = "05f29059c0c2090612e8d742178b0580d2dc940c837851ad723096f87af6663e"
[[package]]
name = "futures-executor"
version = "0.3.31"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1e28d1d997f585e54aebc3f97d39e72338912123a67330d723fdbb564d646c9f"
dependencies = [
"futures-core",
"futures-task",
"futures-util",
]
[[package]] [[package]]
name = "futures-io" name = "futures-io"
version = "0.3.31" version = "0.3.31"
@ -1045,10 +1079,13 @@ version = "0.3.31"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9fa08315bb612088cc391249efdc3bc77536f16c91f6cf495e6fbe85b20a4a81" checksum = "9fa08315bb612088cc391249efdc3bc77536f16c91f6cf495e6fbe85b20a4a81"
dependencies = [ dependencies = [
"futures-channel",
"futures-core", "futures-core",
"futures-io",
"futures-macro", "futures-macro",
"futures-sink", "futures-sink",
"futures-task", "futures-task",
"memchr",
"pin-project-lite", "pin-project-lite",
"pin-utils", "pin-utils",
"slab", "slab",
@ -1716,12 +1753,14 @@ name = "ofborg"
version = "0.90.0" version = "0.90.0"
dependencies = [ dependencies = [
"async-std", "async-std",
"async-stream",
"base64 0.22.1", "base64 0.22.1",
"brace-expand", "brace-expand",
"chrono", "chrono",
"clap", "clap",
"either", "either",
"fs2", "fs2",
"futures",
"futures-util", "futures-util",
"http", "http",
"hubcaps", "hubcaps",
@ -1742,6 +1781,8 @@ dependencies = [
"sys-info", "sys-info",
"tempfile", "tempfile",
"thiserror 1.0.69", "thiserror 1.0.69",
"tokio",
"tokio-stream",
"tracing", "tracing",
"tracing-subscriber", "tracing-subscriber",
"uuid", "uuid",
@ -2858,6 +2899,17 @@ dependencies = [
"tokio", "tokio",
] ]
[[package]]
name = "tokio-stream"
version = "0.1.16"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4f4e6ce100d0eb49a2734f8c0812bcd324cf357d21810932c5df6b96ef2b86f1"
dependencies = [
"futures-core",
"pin-project-lite",
"tokio",
]
[[package]] [[package]]
name = "tokio-util" name = "tokio-util"
version = "0.7.12" version = "0.7.12"

View file

@ -5,6 +5,10 @@
"checkout": { "checkout": {
"root": "$STATE_DIRECTORY/.checkouts" "root": "$STATE_DIRECTORY/.checkouts"
}, },
"vcs": "Gerrit",
"gerrit": {
"instance_uri": "cl.forkos.org"
},
"nix": { "nix": {
"system": "x86_64-linux", "system": "x86_64-linux",
"remote": "daemon", "remote": "daemon",

View file

@ -44,4 +44,8 @@ jfs = "0.9.0"
base64 = "0.22.1" base64 = "0.22.1"
thiserror = "1.0.67" thiserror = "1.0.67"
openssh = { version = "0.11.3", features = ["process-mux"], default-features = false } openssh = { version = "0.11.3", features = ["process-mux"], default-features = false }
futures = "0.3.31"
tokio = "1.41.1"
tokio-stream = { version = "0.1.16", features = ["io-util"] }
async-stream = "0.3.6"
# reqwest = "0.12.9" # reqwest = "0.12.9"

View file

@ -0,0 +1,42 @@
/// This is a Gerrit listener for events which puts them on stdout for debugging purposes.
/// The list of event type listened to is static.
use std::env;
use std::error::Error;
use async_std::stream::StreamExt;
use async_std::task;
use futures::pin_mut;
use ofborg::vcs::gerrit::ssh::GerritSSHApi;
use tracing::info;
use ofborg::config;
fn main() -> Result<(), Box<dyn Error>> {
ofborg::setup_log();
let arg = env::args()
.nth(1)
.expect("usage: listen-gerrit-events <config>");
let cfg = config::load(arg.as_ref());
let gerrit_cfg = cfg
.gerrit
.expect("Gerrit event streaming requires a Gerrit configuration");
let gerrit_ssh_uri = format!("ssh://{}:{}", gerrit_cfg.instance_uri, gerrit_cfg.ssh_port);
info!("Listening events from Gerrit on {}", gerrit_ssh_uri);
let mut gerrit_api = GerritSSHApi::new(gerrit_cfg.ssh_private_key_file, &gerrit_ssh_uri);
task::block_on(async {
let event_stream = gerrit_api.stream_events().await.unwrap();
pin_mut!(event_stream);
loop {
let thing = event_stream.next().await;
println!("{:?}", thing);
if let Some(Ok(event)) = thing {
println!("{:#?}", event);
}
}
});
Ok(())
}

View file

@ -28,12 +28,12 @@ impl From<Vec<Account>> for crate::vcs::generic::ChangeReviewers {
pub struct Approval { pub struct Approval {
pub r#type: String, // Internal name of the approval pub r#type: String, // Internal name of the approval
pub description: String, // Human-readable category of the approval pub description: String, // Human-readable category of the approval
pub value: i32, // Value assigned by the approval (usually a numerical score) pub value: String, // Value assigned by the approval (usually a numerical score)
#[serde(rename = "oldValue")] #[serde(rename = "oldValue")]
pub old_value: Option<i32>, // Previous approval score, if present pub old_value: Option<String>, // Previous approval score, if present
#[serde(rename = "grantedOn")] #[serde(rename = "grantedOn")]
pub granted_on: u64, // Time in seconds since the UNIX epoch pub granted_on: Option<u64>, // Time in seconds since the UNIX epoch
pub by: Account, // Reviewer of the patch set pub by: Option<Account>, // Reviewer of the patch set
} }
#[derive(Serialize, Deserialize, Debug)] #[derive(Serialize, Deserialize, Debug)]
@ -87,9 +87,12 @@ pub struct PatchSet {
#[serde(rename = "createdOn")] #[serde(rename = "createdOn")]
pub created_on: u64, // Time in seconds since the UNIX epoch pub created_on: u64, // Time in seconds since the UNIX epoch
pub kind: Kind, // Kind of change ("REWORK", "TRIVIAL_REBASE", etc.) pub kind: Kind, // Kind of change ("REWORK", "TRIVIAL_REBASE", etc.)
#[serde(default = "Default::default")]
pub approvals: Vec<Approval>, // Approvals granted pub approvals: Vec<Approval>, // Approvals granted
#[serde(default = "Default::default")]
pub comments: Vec<PatchSetComment>, // All comments for this patchset pub comments: Vec<PatchSetComment>, // All comments for this patchset
pub files: Vec<String>, // All changed files in this patchset #[serde(default = "Default::default")]
pub files: Vec<String>, // All changed files in this patchset
#[serde(rename = "sizeInsertions")] #[serde(rename = "sizeInsertions")]
pub size_insertions: i64, // Size of insertions pub size_insertions: i64, // Size of insertions
#[serde(rename = "sizeDeletions")] #[serde(rename = "sizeDeletions")]
@ -179,30 +182,32 @@ pub struct Change {
pub subject: String, pub subject: String,
pub owner: Account, pub owner: Account,
pub url: String, pub url: String,
pub commit_message: String, pub commit_message: Option<String>,
#[serde(default = "Default::default")]
pub hashtags: Vec<String>, pub hashtags: Vec<String>,
#[serde(rename = "createdOn")] #[serde(rename = "createdOn")]
pub created_on: u64, // Time in seconds since UNIX epoch pub created_on: u64, // Time in seconds since UNIX epoch
#[serde(rename = "lastUpdated")] #[serde(rename = "lastUpdated")]
pub last_updated: u64, // Time in seconds since UNIX epoch pub last_updated: Option<u64>, // Time in seconds since UNIX epoch
pub open: bool, pub open: Option<bool>,
pub status: ChangeStatus, // "NEW", "MERGED", or "ABANDONED" pub status: ChangeStatus, // "NEW", "MERGED", or "ABANDONED"
pub private: bool, pub private: Option<bool>,
pub wip: bool, // Work in progress pub wip: Option<bool>, // Work in progress
#[serde(default = "Default::default")]
pub comments: Vec<ReviewerMessage>, // Inline/file comments pub comments: Vec<ReviewerMessage>, // Inline/file comments
#[serde(rename = "trackingIds")] #[serde(rename = "trackingIds", default = "Default::default")]
pub tracking_ids: Vec<TrackingId>, // Links to issue tracking systems pub tracking_ids: Vec<TrackingId>, // Links to issue tracking systems
#[serde(rename = "currentPatchSet")] #[serde(rename = "currentPatchSet")]
pub current_patch_set: PatchSet, pub current_patch_set: Option<PatchSet>,
#[serde(rename = "patchSets")] #[serde(rename = "patchSets", default = "Default::default")]
pub patch_sets: Vec<PatchSet>, // All patch sets pub patch_sets: Vec<PatchSet>, // All patch sets
#[serde(rename = "dependsOn")] #[serde(rename = "dependsOn", default = "Default::default")]
pub depends_on: Vec<ChangeDependency>, // Dependencies pub depends_on: Vec<ChangeDependency>, // Dependencies
#[serde(rename = "neededBy")] #[serde(rename = "neededBy", default = "Default::default")]
pub needed_by: Vec<ChangeDependency>, // Reverse dependencies pub needed_by: Vec<ChangeDependency>, // Reverse dependencies
#[serde(rename = "submitRecords")] #[serde(rename = "submitRecords", default = "Default::default")]
pub submit_records: Vec<SubmitRecord>, // Submission information pub submit_records: Vec<SubmitRecord>, // Submission information
#[serde(rename = "allReviewers")] #[serde(rename = "allReviewers", default = "Default::default")]
pub all_reviewers: Vec<Account>, // List of all reviewers pub all_reviewers: Vec<Account>, // List of all reviewers
} }
@ -213,7 +218,11 @@ impl From<Change> for crate::message::Change {
// While the change number is deprecated, we actually need it. // While the change number is deprecated, we actually need it.
// FIXME: enforce type level checking of this. // FIXME: enforce type level checking of this.
number: value.change_number.unwrap(), number: value.change_number.unwrap(),
head_sha: value.current_patch_set.revision, // FIXME: that's not good…
// basically, some events like `comment-added` does not include the current patch set
// because it's already included in another field, we could probably do another
// conversion pass to improve the type safety.
head_sha: value.current_patch_set.unwrap().revision,
} }
} }
} }
@ -224,18 +233,21 @@ pub struct RefUpdate {
pub old_rev: String, // The old value of the ref, prior to the update pub old_rev: String, // The old value of the ref, prior to the update
#[serde(rename = "newRev")] #[serde(rename = "newRev")]
pub new_rev: String, // The new value the ref was updated to pub new_rev: String, // The new value the ref was updated to
#[serde(rename = "refName")]
pub ref_name: String, // Full ref name within the project pub ref_name: String, // Full ref name within the project
pub project: String, // Project path in Gerrit pub project: String, // Project path in Gerrit
} }
#[derive(Serialize, Deserialize, Debug)] #[derive(Serialize, Deserialize, Debug)]
#[serde(rename_all = "camelCase", tag = "type")] #[serde(rename_all = "kebab-case", tag = "type")]
pub enum GerritStreamEvent { pub enum GerritStreamEvent {
ChangeAbandoned { ChangeAbandoned {
change: Change, change: Change,
#[serde(rename = "patchSet")]
patch_set: PatchSet, patch_set: PatchSet,
abandoner: Account, abandoner: Account,
reason: String, reason: String,
#[serde(rename = "eventCreatedOn")]
event_created_on: u64, event_created_on: u64,
}, },
ChangeDeleted { ChangeDeleted {
@ -244,24 +256,32 @@ pub enum GerritStreamEvent {
}, },
ChangeMerged { ChangeMerged {
change: Change, change: Change,
#[serde(rename = "patchSet")]
patch_set: PatchSet, patch_set: PatchSet,
submitter: Account, submitter: Account,
#[serde(rename = "newRev")]
new_rev: String, new_rev: String,
#[serde(rename = "eventCreatedOn")]
event_created_on: u64, event_created_on: u64,
}, },
ChangeRestored { ChangeRestored {
change: Change, change: Change,
#[serde(rename = "patchSet")]
patch_set: PatchSet, patch_set: PatchSet,
restorer: Account, restorer: Account,
reason: String, reason: String,
#[serde(rename = "eventCreatedOn")]
event_created_on: u64, event_created_on: u64,
}, },
CommentAdded { CommentAdded {
change: Change, change: Change,
#[serde(rename = "patchSet")]
patch_set: PatchSet, patch_set: PatchSet,
author: Account, author: Account,
#[serde(default = "Default::default")]
approvals: Vec<Approval>, approvals: Vec<Approval>,
comment: String, comment: Option<String>,
#[serde(rename = "eventCreatedOn")]
event_created_on: u64, event_created_on: u64,
}, },
DroppedOutput, DroppedOutput,
@ -271,43 +291,57 @@ pub enum GerritStreamEvent {
added: Vec<String>, added: Vec<String>,
removed: Vec<String>, removed: Vec<String>,
hashtags: Vec<String>, hashtags: Vec<String>,
#[serde(rename = "eventCreatedOn")]
event_created_on: u64, event_created_on: u64,
}, },
ProjectCreated { ProjectCreated {
#[serde(rename = "projectName")]
project_name: String, project_name: String,
#[serde(rename = "projectHead")]
project_head: String, project_head: String,
#[serde(rename = "eventCreatedOn")]
event_created_on: u64, event_created_on: u64,
}, },
PatchSetCreated { PatchSetCreated {
change: Change, change: Change,
#[serde(rename = "patchSet")]
patch_set: PatchSet, patch_set: PatchSet,
uploader: Account, uploader: Account,
#[serde(rename = "eventCreatedOn")]
event_created_on: u64, event_created_on: u64,
}, },
RefUpdated { RefUpdated {
submitter: Account, submitter: Account,
#[serde(rename = "refUpdate")]
ref_update: RefUpdate, ref_update: RefUpdate,
#[serde(rename = "eventCreatedOn")]
event_created_on: u64, event_created_on: u64,
}, },
BatchRefUpdated { BatchRefUpdated {
submitter: Account, submitter: Account,
#[serde(rename = "refUpdates")]
ref_updates: Vec<RefUpdate>, ref_updates: Vec<RefUpdate>,
#[serde(rename = "eventCreatedOn")]
event_created_on: u64, event_created_on: u64,
}, },
ReviewerAdded { ReviewerAdded {
change: Change, change: Change,
#[serde(rename = "patchSet")]
patch_set: PatchSet, patch_set: PatchSet,
reviewer: Account, reviewer: Account,
adder: Account, adder: Account,
#[serde(rename = "eventCreatedOn")]
event_created_on: u64, event_created_on: u64,
}, },
ReviewerDeleted { ReviewerDeleted {
change: Change, change: Change,
#[serde(rename = "patchSet")]
patch_set: PatchSet, patch_set: PatchSet,
reviewer: Account, reviewer: Account,
remover: Account, remover: Account,
approvals: Vec<Approval>, approvals: Vec<Approval>,
comment: String, comment: String,
#[serde(rename = "eventCreatedOn")]
event_created_on: u64, event_created_on: u64,
}, },
TopicChanged { TopicChanged {
@ -315,22 +349,28 @@ pub enum GerritStreamEvent {
old_topic: Option<String>, old_topic: Option<String>,
new_topic: Option<String>, new_topic: Option<String>,
changer: Account, changer: Account,
#[serde(rename = "eventCreatedOn")]
event_created_on: u64, event_created_on: u64,
}, },
WorkInProgressStateChanged { WorkInProgressStateChanged {
change: Change, change: Change,
#[serde(rename = "patchSet")]
patch_set: PatchSet, patch_set: PatchSet,
changer: Account, changer: Account,
#[serde(rename = "eventCreatedOn")]
event_created_on: u64, event_created_on: u64,
}, },
PrivateStateChanged { PrivateStateChanged {
change: Change, change: Change,
#[serde(rename = "patchSet")]
patch_set: PatchSet, patch_set: PatchSet,
changer: Account, changer: Account,
#[serde(rename = "eventCreatedOn")]
event_created_on: u64, event_created_on: u64,
}, },
VoteDeleted { VoteDeleted {
change: Change, change: Change,
#[serde(rename = "patchSet")]
patch_set: PatchSet, patch_set: PatchSet,
reviewer: Account, reviewer: Account,
remover: Account, remover: Account,
@ -338,8 +378,11 @@ pub enum GerritStreamEvent {
comment: String, comment: String,
}, },
ProjectHeadUpdate { ProjectHeadUpdate {
#[serde(rename = "oldHead")]
old_head: String, old_head: String,
#[serde(rename = "newHead")]
new_head: String, new_head: String,
#[serde(rename = "eventCreatedOn")]
event_created_on: u64, event_created_on: u64,
}, },
} }

View file

@ -1,13 +1,14 @@
use std::error::Error; use std::{error::Error, path::PathBuf};
use async_std::{io::BufReader, path::PathBuf, stream::StreamExt}; use async_std::stream::StreamExt;
use async_stream::stream;
use futures_util::Stream; use futures_util::Stream;
use openssh::{Session, SessionBuilder}; use openssh::{Session, SessionBuilder, Stdio};
use tokio::io::AsyncBufReadExt;
use super::data_structures::GerritStreamEvent; use super::data_structures::GerritStreamEvent;
pub struct GerritSSHApi { pub struct GerritSSHApi {
private_key_file: PathBuf,
session: Session, session: Session,
} }
@ -19,7 +20,6 @@ impl GerritSSHApi {
&format!("Failed to launch SSH master to destination '{}'", uri), &format!("Failed to launch SSH master to destination '{}'", uri),
); );
Self { Self {
private_key_file,
session: Session::new_process_mux(tempdir), session: Session::new_process_mux(tempdir),
} }
} }
@ -27,32 +27,45 @@ impl GerritSSHApi {
pub async fn raw_command( pub async fn raw_command(
&mut self, &mut self,
args: Vec<&str>, args: Vec<&str>,
) -> Result<impl Stream<Item = String>, Box<dyn Error>> { ) -> Result<impl Stream<Item = Result<String, std::io::Error>> + '_, Box<dyn Error>> {
let child = self self.session
.check()
.await
.expect("Session is not in a good state.");
let mut child = self
.session .session
.raw_command("gerrit") .raw_command("gerrit")
.raw_args(args) .raw_args(args)
.stdout(Stdio::piped())
.stderr(Stdio::piped())
.spawn() .spawn()
.await .await
.expect("Failed to spawn a command"); .expect("Failed to spawn a command");
let stdout = BufReader::new(child.stdout().take().expect("Failed to obtain stdout")); Ok(stream! {
let lines = futures_util::stream::unfold(stdout.lines(), |mut lines| async { let child_stdout = child.stdout().take().expect("Failed to obtain stdout");
match lines.next().await { let stdout = tokio::io::BufReader::new(child_stdout);
Some(Ok(line)) => Some((line, lines)), let mut line_stream = stdout.lines();
Some(Err(_)) => None,
None => None,
}
});
Ok(lines) loop {
match line_stream.next_line().await {
Ok(Some(line)) => yield Ok(line),
Ok(None) => {
break;
}
Err(err) => yield Err(err)
}
}
})
} }
pub async fn stream_events( pub async fn stream_events(
&mut self, &mut self,
) -> Result<impl Stream<Item = Result<GerritStreamEvent, Box<dyn Error>>>, Box<dyn Error>> { ) -> Result<impl Stream<Item = Result<GerritStreamEvent, Box<dyn Error>>> + '_, Box<dyn Error>>
{
let lines = self.raw_command(vec!["stream-events"]).await?; let lines = self.raw_command(vec!["stream-events"]).await?;
let events = lines.map(|line| { let events = lines.filter_map(|line| line.ok()).map(|line| {
let event: Result<GerritStreamEvent, _> = serde_json::from_str(&line); let event: Result<GerritStreamEvent, _> = serde_json::from_str(&line);
match event { match event {
Ok(event) => Ok(event), Ok(event) => Ok(event),