diff --git a/Cargo.lock b/Cargo.lock index 5a2265d..8658080 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -384,6 +384,28 @@ dependencies = [ "wasm-bindgen-futures", ] +[[package]] +name = "async-stream" +version = "0.3.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0b5a71a6f37880a80d1d7f19efd781e4b5de42c88f0722cc13bcb6cc2cfe8476" +dependencies = [ + "async-stream-impl", + "futures-core", + "pin-project-lite", +] + +[[package]] +name = "async-stream-impl" +version = "0.3.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c7c24de15d275a1ecfd47a380fb4d5ec9bfe0933f309ed5e705b775596a3574d" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "async-task" version = "4.7.1" @@ -960,6 +982,7 @@ checksum = "65bc07b1a8bc7c85c5f2e110c476c7389b4554ba72af57d8445ea63a576b0876" dependencies = [ "futures-channel", "futures-core", + "futures-executor", "futures-io", "futures-sink", "futures-task", @@ -982,6 +1005,17 @@ version = "0.3.31" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "05f29059c0c2090612e8d742178b0580d2dc940c837851ad723096f87af6663e" +[[package]] +name = "futures-executor" +version = "0.3.31" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1e28d1d997f585e54aebc3f97d39e72338912123a67330d723fdbb564d646c9f" +dependencies = [ + "futures-core", + "futures-task", + "futures-util", +] + [[package]] name = "futures-io" version = "0.3.31" @@ -1045,10 +1079,13 @@ version = "0.3.31" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9fa08315bb612088cc391249efdc3bc77536f16c91f6cf495e6fbe85b20a4a81" dependencies = [ + "futures-channel", "futures-core", + "futures-io", "futures-macro", "futures-sink", "futures-task", + "memchr", "pin-project-lite", "pin-utils", "slab", @@ -1716,12 +1753,14 @@ name = "ofborg" version = "0.90.0" dependencies = [ "async-std", + "async-stream", "base64 0.22.1", "brace-expand", "chrono", "clap", "either", "fs2", + "futures", "futures-util", "http", "hubcaps", @@ -1742,6 +1781,8 @@ dependencies = [ "sys-info", "tempfile", "thiserror 1.0.69", + "tokio", + "tokio-stream", "tracing", "tracing-subscriber", "uuid", @@ -2858,6 +2899,17 @@ dependencies = [ "tokio", ] +[[package]] +name = "tokio-stream" +version = "0.1.16" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4f4e6ce100d0eb49a2734f8c0812bcd324cf357d21810932c5df6b96ef2b86f1" +dependencies = [ + "futures-core", + "pin-project-lite", + "tokio", +] + [[package]] name = "tokio-util" version = "0.7.12" diff --git a/dev.config.json b/dev.config.json index 42a2989..cf0f1d8 100644 --- a/dev.config.json +++ b/dev.config.json @@ -5,6 +5,10 @@ "checkout": { "root": "$STATE_DIRECTORY/.checkouts" }, + "vcs": "Gerrit", + "gerrit": { + "instance_uri": "cl.forkos.org" + }, "nix": { "system": "x86_64-linux", "remote": "daemon", diff --git a/ofborg/Cargo.toml b/ofborg/Cargo.toml index df15674..4fa4ac6 100644 --- a/ofborg/Cargo.toml +++ b/ofborg/Cargo.toml @@ -44,4 +44,8 @@ jfs = "0.9.0" base64 = "0.22.1" thiserror = "1.0.67" openssh = { version = "0.11.3", features = ["process-mux"], default-features = false } +futures = "0.3.31" +tokio = "1.41.1" +tokio-stream = { version = "0.1.16", features = ["io-util"] } +async-stream = "0.3.6" # reqwest = "0.12.9" diff --git a/ofborg/src/bin/listen-gerrit-events.rs b/ofborg/src/bin/listen-gerrit-events.rs new file mode 100644 index 0000000..e2c82f6 --- /dev/null +++ b/ofborg/src/bin/listen-gerrit-events.rs @@ -0,0 +1,42 @@ +/// This is a Gerrit listener for events which puts them on stdout for debugging purposes. +/// The list of event type listened to is static. +use std::env; +use std::error::Error; + +use async_std::stream::StreamExt; +use async_std::task; +use futures::pin_mut; +use ofborg::vcs::gerrit::ssh::GerritSSHApi; +use tracing::info; + +use ofborg::config; + +fn main() -> Result<(), Box> { + ofborg::setup_log(); + + let arg = env::args() + .nth(1) + .expect("usage: listen-gerrit-events "); + let cfg = config::load(arg.as_ref()); + + let gerrit_cfg = cfg + .gerrit + .expect("Gerrit event streaming requires a Gerrit configuration"); + let gerrit_ssh_uri = format!("ssh://{}:{}", gerrit_cfg.instance_uri, gerrit_cfg.ssh_port); + info!("Listening events from Gerrit on {}", gerrit_ssh_uri); + let mut gerrit_api = GerritSSHApi::new(gerrit_cfg.ssh_private_key_file, &gerrit_ssh_uri); + + task::block_on(async { + let event_stream = gerrit_api.stream_events().await.unwrap(); + pin_mut!(event_stream); + loop { + let thing = event_stream.next().await; + println!("{:?}", thing); + if let Some(Ok(event)) = thing { + println!("{:#?}", event); + } + } + }); + + Ok(()) +} diff --git a/ofborg/src/vcs/gerrit/data_structures.rs b/ofborg/src/vcs/gerrit/data_structures.rs index b3c774e..8875a1c 100644 --- a/ofborg/src/vcs/gerrit/data_structures.rs +++ b/ofborg/src/vcs/gerrit/data_structures.rs @@ -28,12 +28,12 @@ impl From> for crate::vcs::generic::ChangeReviewers { pub struct Approval { pub r#type: String, // Internal name of the approval pub description: String, // Human-readable category of the approval - pub value: i32, // Value assigned by the approval (usually a numerical score) + pub value: String, // Value assigned by the approval (usually a numerical score) #[serde(rename = "oldValue")] - pub old_value: Option, // Previous approval score, if present + pub old_value: Option, // Previous approval score, if present #[serde(rename = "grantedOn")] - pub granted_on: u64, // Time in seconds since the UNIX epoch - pub by: Account, // Reviewer of the patch set + pub granted_on: Option, // Time in seconds since the UNIX epoch + pub by: Option, // Reviewer of the patch set } #[derive(Serialize, Deserialize, Debug)] @@ -87,9 +87,12 @@ pub struct PatchSet { #[serde(rename = "createdOn")] pub created_on: u64, // Time in seconds since the UNIX epoch pub kind: Kind, // Kind of change ("REWORK", "TRIVIAL_REBASE", etc.) + #[serde(default = "Default::default")] pub approvals: Vec, // Approvals granted + #[serde(default = "Default::default")] pub comments: Vec, // All comments for this patchset - pub files: Vec, // All changed files in this patchset + #[serde(default = "Default::default")] + pub files: Vec, // All changed files in this patchset #[serde(rename = "sizeInsertions")] pub size_insertions: i64, // Size of insertions #[serde(rename = "sizeDeletions")] @@ -179,30 +182,32 @@ pub struct Change { pub subject: String, pub owner: Account, pub url: String, - pub commit_message: String, + pub commit_message: Option, + #[serde(default = "Default::default")] pub hashtags: Vec, #[serde(rename = "createdOn")] pub created_on: u64, // Time in seconds since UNIX epoch #[serde(rename = "lastUpdated")] - pub last_updated: u64, // Time in seconds since UNIX epoch - pub open: bool, + pub last_updated: Option, // Time in seconds since UNIX epoch + pub open: Option, pub status: ChangeStatus, // "NEW", "MERGED", or "ABANDONED" - pub private: bool, - pub wip: bool, // Work in progress + pub private: Option, + pub wip: Option, // Work in progress + #[serde(default = "Default::default")] pub comments: Vec, // Inline/file comments - #[serde(rename = "trackingIds")] + #[serde(rename = "trackingIds", default = "Default::default")] pub tracking_ids: Vec, // Links to issue tracking systems #[serde(rename = "currentPatchSet")] - pub current_patch_set: PatchSet, - #[serde(rename = "patchSets")] + pub current_patch_set: Option, + #[serde(rename = "patchSets", default = "Default::default")] pub patch_sets: Vec, // All patch sets - #[serde(rename = "dependsOn")] + #[serde(rename = "dependsOn", default = "Default::default")] pub depends_on: Vec, // Dependencies - #[serde(rename = "neededBy")] + #[serde(rename = "neededBy", default = "Default::default")] pub needed_by: Vec, // Reverse dependencies - #[serde(rename = "submitRecords")] + #[serde(rename = "submitRecords", default = "Default::default")] pub submit_records: Vec, // Submission information - #[serde(rename = "allReviewers")] + #[serde(rename = "allReviewers", default = "Default::default")] pub all_reviewers: Vec, // List of all reviewers } @@ -213,7 +218,8 @@ impl From for crate::message::Change { // While the change number is deprecated, we actually need it. // FIXME: enforce type level checking of this. number: value.change_number.unwrap(), - head_sha: value.current_patch_set.revision, + // FIXME: that's not good… + head_sha: value.current_patch_set.unwrap().revision, } } } @@ -224,18 +230,21 @@ pub struct RefUpdate { pub old_rev: String, // The old value of the ref, prior to the update #[serde(rename = "newRev")] pub new_rev: String, // The new value the ref was updated to + #[serde(rename = "refName")] pub ref_name: String, // Full ref name within the project - pub project: String, // Project path in Gerrit + pub project: String, // Project path in Gerrit } #[derive(Serialize, Deserialize, Debug)] -#[serde(rename_all = "camelCase", tag = "type")] +#[serde(rename_all = "kebab-case", tag = "type")] pub enum GerritStreamEvent { ChangeAbandoned { change: Change, + #[serde(rename = "patchSet")] patch_set: PatchSet, abandoner: Account, reason: String, + #[serde(rename = "eventCreatedOn")] event_created_on: u64, }, ChangeDeleted { @@ -244,24 +253,32 @@ pub enum GerritStreamEvent { }, ChangeMerged { change: Change, + #[serde(rename = "patchSet")] patch_set: PatchSet, submitter: Account, + #[serde(rename = "newRev")] new_rev: String, + #[serde(rename = "eventCreatedOn")] event_created_on: u64, }, ChangeRestored { change: Change, + #[serde(rename = "patchSet")] patch_set: PatchSet, restorer: Account, reason: String, + #[serde(rename = "eventCreatedOn")] event_created_on: u64, }, CommentAdded { change: Change, + #[serde(rename = "patchSet")] patch_set: PatchSet, author: Account, + #[serde(default = "Default::default")] approvals: Vec, - comment: String, + comment: Option, + #[serde(rename = "eventCreatedOn")] event_created_on: u64, }, DroppedOutput, @@ -271,43 +288,57 @@ pub enum GerritStreamEvent { added: Vec, removed: Vec, hashtags: Vec, + #[serde(rename = "eventCreatedOn")] event_created_on: u64, }, ProjectCreated { + #[serde(rename = "projectName")] project_name: String, + #[serde(rename = "projectHead")] project_head: String, + #[serde(rename = "eventCreatedOn")] event_created_on: u64, }, PatchSetCreated { change: Change, + #[serde(rename = "patchSet")] patch_set: PatchSet, uploader: Account, + #[serde(rename = "eventCreatedOn")] event_created_on: u64, }, RefUpdated { submitter: Account, + #[serde(rename = "refUpdate")] ref_update: RefUpdate, + #[serde(rename = "eventCreatedOn")] event_created_on: u64, }, BatchRefUpdated { submitter: Account, + #[serde(rename = "refUpdates")] ref_updates: Vec, + #[serde(rename = "eventCreatedOn")] event_created_on: u64, }, ReviewerAdded { change: Change, + #[serde(rename = "patchSet")] patch_set: PatchSet, reviewer: Account, adder: Account, + #[serde(rename = "eventCreatedOn")] event_created_on: u64, }, ReviewerDeleted { change: Change, + #[serde(rename = "patchSet")] patch_set: PatchSet, reviewer: Account, remover: Account, approvals: Vec, comment: String, + #[serde(rename = "eventCreatedOn")] event_created_on: u64, }, TopicChanged { @@ -315,22 +346,28 @@ pub enum GerritStreamEvent { old_topic: Option, new_topic: Option, changer: Account, + #[serde(rename = "eventCreatedOn")] event_created_on: u64, }, WorkInProgressStateChanged { change: Change, + #[serde(rename = "patchSet")] patch_set: PatchSet, changer: Account, + #[serde(rename = "eventCreatedOn")] event_created_on: u64, }, PrivateStateChanged { change: Change, + #[serde(rename = "patchSet")] patch_set: PatchSet, changer: Account, + #[serde(rename = "eventCreatedOn")] event_created_on: u64, }, VoteDeleted { change: Change, + #[serde(rename = "patchSet")] patch_set: PatchSet, reviewer: Account, remover: Account, @@ -338,8 +375,11 @@ pub enum GerritStreamEvent { comment: String, }, ProjectHeadUpdate { + #[serde(rename = "oldHead")] old_head: String, + #[serde(rename = "newHead")] new_head: String, + #[serde(rename = "eventCreatedOn")] event_created_on: u64, }, } diff --git a/ofborg/src/vcs/gerrit/ssh.rs b/ofborg/src/vcs/gerrit/ssh.rs index eb1c3e3..2372e4f 100644 --- a/ofborg/src/vcs/gerrit/ssh.rs +++ b/ofborg/src/vcs/gerrit/ssh.rs @@ -1,13 +1,14 @@ -use std::error::Error; +use std::{error::Error, path::PathBuf}; -use async_std::{io::BufReader, path::PathBuf, stream::StreamExt}; +use async_std::stream::StreamExt; +use async_stream::stream; use futures_util::Stream; -use openssh::{Session, SessionBuilder}; +use openssh::{Session, SessionBuilder, Stdio}; +use tokio::io::AsyncBufReadExt; use super::data_structures::GerritStreamEvent; pub struct GerritSSHApi { - private_key_file: PathBuf, session: Session, } @@ -19,7 +20,6 @@ impl GerritSSHApi { &format!("Failed to launch SSH master to destination '{}'", uri), ); Self { - private_key_file, session: Session::new_process_mux(tempdir), } } @@ -27,32 +27,45 @@ impl GerritSSHApi { pub async fn raw_command( &mut self, args: Vec<&str>, - ) -> Result, Box> { - let child = self + ) -> Result> + '_, Box> { + self.session + .check() + .await + .expect("Session is not in a good state."); + + let mut child = self .session .raw_command("gerrit") .raw_args(args) + .stdout(Stdio::piped()) + .stderr(Stdio::piped()) .spawn() .await .expect("Failed to spawn a command"); - let stdout = BufReader::new(child.stdout().take().expect("Failed to obtain stdout")); - let lines = futures_util::stream::unfold(stdout.lines(), |mut lines| async { - match lines.next().await { - Some(Ok(line)) => Some((line, lines)), - Some(Err(_)) => None, - None => None, - } - }); + Ok(stream! { + let child_stdout = child.stdout().take().expect("Failed to obtain stdout"); + let stdout = tokio::io::BufReader::new(child_stdout); + let mut line_stream = stdout.lines(); - Ok(lines) + loop { + match line_stream.next_line().await { + Ok(Some(line)) => yield Ok(line), + Ok(None) => { + break; + } + Err(err) => yield Err(err) + } + } + }) } pub async fn stream_events( &mut self, - ) -> Result>>, Box> { + ) -> Result>> + '_, Box> + { let lines = self.raw_command(vec!["stream-events"]).await?; - let events = lines.map(|line| { + let events = lines.filter_map(|line| line.ok()).map(|line| { let event: Result = serde_json::from_str(&line); match event { Ok(event) => Ok(event),