diff --git a/ofborg/src/easyamqp.rs b/ofborg/src/easyamqp.rs index ccdba9f..0e5bc16 100644 --- a/ofborg/src/easyamqp.rs +++ b/ofborg/src/easyamqp.rs @@ -1,3 +1,5 @@ +use serde::{Deserialize, Serialize}; + pub struct ConsumeConfig { /// Specifies the name of the queue to consume from. pub queue: String, @@ -263,6 +265,12 @@ pub trait ChannelExt { fn declare_exchange(&mut self, config: ExchangeConfig) -> Result<(), Self::Error>; fn declare_queue(&mut self, config: QueueConfig) -> Result<(), Self::Error>; fn bind_queue(&mut self, config: BindQueueConfig) -> Result<(), Self::Error>; + fn send_request Deserialize<'a>>( + &mut self, + exchange: Option<&str>, + routing_key: Option<&str>, + msg: &T, + ) -> impl std::future::Future> + Send; } pub trait ConsumerExt<'a, C> { diff --git a/ofborg/src/easylapin.rs b/ofborg/src/easylapin.rs index 5be7278..aa1daac 100644 --- a/ofborg/src/easylapin.rs +++ b/ofborg/src/easylapin.rs @@ -7,7 +7,7 @@ use crate::easyamqp::{ }; use crate::notifyworker::{NotificationReceiver, SimpleNotifyWorker}; use crate::ofborg; -use crate::worker::{Action, SimpleWorker}; +use crate::worker::{prepare_queue_message, Action, SimpleWorker}; use async_std::future::Future; use async_std::stream::StreamExt; @@ -17,8 +17,9 @@ use lapin::options::{ BasicAckOptions, BasicConsumeOptions, BasicNackOptions, BasicPublishOptions, BasicQosOptions, ExchangeDeclareOptions, QueueBindOptions, QueueDeclareOptions, }; -use lapin::types::{AMQPValue, FieldTable}; +use lapin::types::{AMQPValue, FieldTable, ShortString}; use lapin::{BasicProperties, Channel, Connection, ConnectionProperties, ExchangeKind}; +use serde::{Deserialize, Serialize}; use tracing::{debug, trace}; pub fn from_config(cfg: &RabbitMqConfig) -> Result { @@ -82,13 +83,71 @@ impl ChannelExt for Channel { ))?; Ok(()) } + + async fn send_request Deserialize<'a>>( + &mut self, + exchange: Option<&str>, + routing_key: Option<&str>, + msg: &T, + ) -> Result { + let mut msg = prepare_queue_message(exchange, routing_key, msg); + + let correlation_id: ShortString = format!("{}", uuid::Uuid::new_v4().as_simple()).into(); + let mut props = BasicProperties::default() + .with_reply_to("amq.rabbitmq.reply-to".into()) + .with_correlation_id(correlation_id.clone()) + .with_delivery_mode(2); // do not lose pastebins please + + if let Some(s) = msg.content_type { + props = props.with_content_type(s.into()); + } + + trace!(?exchange, ?routing_key, "sending a RPC request"); + let confirmation = self + .basic_publish( + &msg.exchange.take().unwrap_or_else(|| "".to_owned()), + &msg.routing_key.take().unwrap_or_else(|| "".to_owned()), + BasicPublishOptions::default(), + &msg.content, + props, + ) + .await? + .await?; + + trace!(?confirmation, "RPC request sent"); + + let mut consumer = self + .basic_consume( + "amq.rabbitmq.reply-to".into(), + "whoami", + BasicConsumeOptions::default(), + FieldTable::default(), + ) + .await?; + + while let Some(Ok(deliver)) = consumer.next().await { + debug!(?deliver.delivery_tag, "received an RPC reply"); + if let Some(deliver_correlation_id) = deliver.properties.correlation_id().as_ref() { + if deliver_correlation_id == &correlation_id { + trace!(?deliver_correlation_id, "received the expected RPC reply"); + return Ok(serde_json::from_slice(&deliver.data).unwrap()); + } + } + } + + panic!("did not receive an RPC reply"); + } } impl<'a, W: SimpleWorker + 'a> ConsumerExt<'a, W> for Channel { type Error = lapin::Error; type Handle = Pin + 'a>>; - fn consume(self, mut worker: W, config: ConsumeConfig) -> Result { + fn consume( + mut self, + mut worker: W, + config: ConsumeConfig, + ) -> Result { let mut consumer = task::block_on(self.basic_consume( &config.queue, &config.consumer_tag, @@ -107,7 +166,7 @@ impl<'a, W: SimpleWorker + 'a> ConsumerExt<'a, W> for Channel { ) .expect("worker unexpected message consumed"); - for action in worker.consumer(&job) { + for action in worker.consumer(&mut self, &job) { action_deliver(&self, &deliver, action) .await .expect("action deliver failure"); diff --git a/ofborg/src/tasks/eval/nixpkgs.rs b/ofborg/src/tasks/eval/nixpkgs.rs index 7153adb..ae1ddef 100644 --- a/ofborg/src/tasks/eval/nixpkgs.rs +++ b/ofborg/src/tasks/eval/nixpkgs.rs @@ -47,6 +47,7 @@ fn label_from_title(title: &str) -> Vec { } pub struct NixpkgsStrategy<'a> { + chan: lapin::Channel, job: &'a EvaluationJob, pull: &'a hubcaps::pulls::PullRequest, issue: &'a Issue, @@ -62,6 +63,7 @@ pub struct NixpkgsStrategy<'a> { impl<'a> NixpkgsStrategy<'a> { #[allow(clippy::too_many_arguments)] pub fn new( + chan: lapin::Channel, job: &'a EvaluationJob, pull: &'a hubcaps::pulls::PullRequest, issue: &'a Issue, @@ -70,6 +72,7 @@ impl<'a> NixpkgsStrategy<'a> { nix: Nix, ) -> NixpkgsStrategy<'a> { Self { + chan, job, pull, issue, @@ -205,7 +208,7 @@ impl<'a> NixpkgsStrategy<'a> { } fn update_rebuild_labels( - &self, + &mut self, dir: &Path, overall_status: &mut CommitStatus, ) -> Result<(), Error> { @@ -230,19 +233,25 @@ impl<'a> NixpkgsStrategy<'a> { Ok(()) } - fn gist_changed_paths(&self, attrs: &[PackageArch]) -> Option { - crate::utils::pastebin::make_pastebin( + fn gist_changed_paths(&mut self, attrs: &[PackageArch]) -> Option { + async_std::task::block_on(crate::utils::pastebin::make_pastebin( + &mut self.chan, "Changed Paths", attrs .iter() .map(|attr| format!("{}\t{}", &attr.architecture, &attr.package)) .collect::>() .join("\n"), - ) + )) + .ok() .map(|pp| pp.uri) } - fn record_impacted_maintainers(&self, dir: &Path, attrs: &[PackageArch]) -> Result<(), Error> { + fn record_impacted_maintainers( + &mut self, + dir: &Path, + attrs: &[PackageArch], + ) -> Result<(), Error> { let changed_attributes = attrs .iter() .map(|attr| attr.package.split('.').collect::>()) @@ -252,13 +261,15 @@ impl<'a> NixpkgsStrategy<'a> { let m = ImpactedMaintainers::calculate(&self.nix, dir, changed_paths, &changed_attributes); - let gist_url = crate::utils::pastebin::make_pastebin( + let gist_url = async_std::task::block_on(crate::utils::pastebin::make_pastebin( + &mut self.chan, "Potential Maintainers", match m { Ok(ref maintainers) => format!("Maintainers:\n{}", maintainers), Err(ref e) => format!("Ignorable calculation error:\n{:?}", e), }, - ) + )) + .ok() .map(|pp| pp.uri); let prefix = get_prefix(self.repo.statuses(), &self.job.pr.head_sha)?; @@ -304,7 +315,7 @@ impl<'a> NixpkgsStrategy<'a> { Ok(()) } - fn check_meta_queue_builds(&self, dir: &Path) -> StepResult> { + fn check_meta_queue_builds(&mut self, dir: &Path) -> StepResult> { if let Some(ref possibly_touched_packages) = self.touched_packages { let prefix = get_prefix(self.repo.statuses(), &self.job.pr.head_sha)?; @@ -352,8 +363,13 @@ impl<'a> NixpkgsStrategy<'a> { } Err(out) => { status.set_url( - crate::utils::pastebin::make_pastebin("Meta Check", out.display()) - .map(|pp| pp.uri), + async_std::task::block_on(crate::utils::pastebin::make_pastebin( + &mut self.chan, + "Meta Check", + out.display(), + )) + .ok() + .map(|pp| pp.uri), ); status.set(hubcaps::statuses::State::Failure)?; Err(Error::Fail(String::from( diff --git a/ofborg/src/tasks/evaluate.rs b/ofborg/src/tasks/evaluate.rs index 3d325cb..95e57d4 100644 --- a/ofborg/src/tasks/evaluate.rs +++ b/ofborg/src/tasks/evaluate.rs @@ -73,7 +73,11 @@ impl worker::SimpleWorker for EvaluationWorker } } - fn consumer(&mut self, job: &evaluationjob::EvaluationJob) -> worker::Actions { + fn consumer( + &mut self, + chan: &mut lapin::Channel, + job: &evaluationjob::EvaluationJob, + ) -> worker::Actions { let span = debug_span!("job", pr = ?job.pr.number); let _enter = span.enter(); @@ -95,7 +99,7 @@ impl worker::SimpleWorker for EvaluationWorker &self.cloner, job, ) - .worker_actions() + .worker_actions(chan) } } @@ -180,26 +184,31 @@ impl<'a, E: stats::SysEvents + 'static> OneEval<'a, E> { ) } - fn make_pastebin(&self, title: &str, contents: String) -> Option { - crate::utils::pastebin::make_pastebin(title, contents) + fn make_pastebin( + &mut self, + chan: &mut lapin::Channel, + title: &str, + contents: String, + ) -> Option { + async_std::task::block_on(crate::utils::pastebin::make_pastebin(chan, title, contents)).ok() } - fn worker_actions(&mut self) -> worker::Actions { - let eval_result = self.evaluate_job().map_err(|eval_error| match eval_error { - // Handle error cases which expect us to post statuses - // to github. Convert Eval Errors in to Result<_, CommitStatusWrite> - EvalWorkerError::EvalError(eval::Error::Fail(msg)) => { - self.update_status(msg, None, hubcaps::statuses::State::Failure) - } - EvalWorkerError::EvalError(eval::Error::FailWithPastebin(msg, title, content)) => self - .update_status( - msg, - self.make_pastebin(&title, content).map(|pp| pp.uri), - hubcaps::statuses::State::Failure, - ), - EvalWorkerError::EvalError(eval::Error::CommitStatusWrite(e)) => Err(e), - EvalWorkerError::CommitStatusWrite(e) => Err(e), - }); + fn worker_actions(&mut self, chan: &mut lapin::Channel) -> worker::Actions { + let eval_result = self + .evaluate_job(chan) + .map_err(|eval_error| match eval_error { + // Handle error cases which expect us to post statuses + // to github. Convert Eval Errors in to Result<_, CommitStatusWrite> + EvalWorkerError::EvalError(eval::Error::Fail(msg)) => { + self.update_status(msg, None, hubcaps::statuses::State::Failure) + } + EvalWorkerError::EvalError(eval::Error::FailWithPastebin(msg, title, content)) => { + let pastebin = self.make_pastebin(chan, &title, content).map(|pp| pp.uri); + self.update_status(msg, pastebin, hubcaps::statuses::State::Failure) + } + EvalWorkerError::EvalError(eval::Error::CommitStatusWrite(e)) => Err(e), + EvalWorkerError::CommitStatusWrite(e) => Err(e), + }); match eval_result { Ok(eval_actions) => eval_actions, @@ -236,7 +245,10 @@ impl<'a, E: stats::SysEvents + 'static> OneEval<'a, E> { // FIXME: remove with rust/cargo update #[allow(clippy::cognitive_complexity)] - fn evaluate_job(&mut self) -> Result { + fn evaluate_job( + &mut self, + chan: &mut lapin::Channel, + ) -> Result { let job = self.job; let repo = self .client_app @@ -277,6 +289,7 @@ impl<'a, E: stats::SysEvents + 'static> OneEval<'a, E> { let mut evaluation_strategy: Box = if job.is_nixpkgs() { Box::new(eval::NixpkgsStrategy::new( + chan.clone(), job, &pull, &issue, @@ -408,6 +421,7 @@ impl<'a, E: stats::SysEvents + 'static> OneEval<'a, E> { state = hubcaps::statuses::State::Failure; gist_url = self .make_pastebin( + chan, &format!("[{}] Evaluation of {}", prefix, check.name()), file_to_str(&mut out), ) diff --git a/ofborg/src/tasks/evaluationfilter.rs b/ofborg/src/tasks/evaluationfilter.rs index b45bebb..2fb556d 100644 --- a/ofborg/src/tasks/evaluationfilter.rs +++ b/ofborg/src/tasks/evaluationfilter.rs @@ -29,7 +29,11 @@ impl worker::SimpleWorker for EvaluationFilterWorker { } } - fn consumer(&mut self, job: &ghevent::PullRequestEvent) -> worker::Actions { + fn consumer( + &mut self, + _chan: &mut lapin::Channel, + job: &ghevent::PullRequestEvent, + ) -> worker::Actions { let span = debug_span!("job", pr = ?job.number); let _enter = span.enter(); diff --git a/ofborg/src/tasks/githubcommentfilter.rs b/ofborg/src/tasks/githubcommentfilter.rs index 7312111..e2f11d9 100644 --- a/ofborg/src/tasks/githubcommentfilter.rs +++ b/ofborg/src/tasks/githubcommentfilter.rs @@ -36,7 +36,11 @@ impl worker::SimpleWorker for GitHubCommentWorker { // FIXME: remove with rust/cargo update #[allow(clippy::cognitive_complexity)] - fn consumer(&mut self, job: &ghevent::IssueComment) -> worker::Actions { + fn consumer( + &mut self, + _chan: &mut lapin::Channel, + job: &ghevent::IssueComment, + ) -> worker::Actions { let span = debug_span!("job", pr = ?job.issue.number); let _enter = span.enter(); diff --git a/ofborg/src/tasks/githubcommentposter.rs b/ofborg/src/tasks/githubcommentposter.rs index 2da15ae..f11ff6d 100644 --- a/ofborg/src/tasks/githubcommentposter.rs +++ b/ofborg/src/tasks/githubcommentposter.rs @@ -46,7 +46,7 @@ impl worker::SimpleWorker for GitHubCommentPoster { PostableEvent::from(body) } - fn consumer(&mut self, job: &PostableEvent) -> worker::Actions { + fn consumer(&mut self, _chan: &mut lapin::Channel, job: &PostableEvent) -> worker::Actions { let mut checks: Vec = vec![]; let repo: Repo; diff --git a/ofborg/src/tasks/log_message_collector.rs b/ofborg/src/tasks/log_message_collector.rs index 8cc4a95..8de7260 100644 --- a/ofborg/src/tasks/log_message_collector.rs +++ b/ofborg/src/tasks/log_message_collector.rs @@ -210,7 +210,7 @@ impl worker::SimpleWorker for LogMessageCollector { }) } - fn consumer(&mut self, job: &LogMessage) -> worker::Actions { + fn consumer(&mut self, _chan: &mut lapin::Channel, job: &LogMessage) -> worker::Actions { match job.message { MsgType::Start(ref start) => { self.write_metadata(&job.from, start) diff --git a/ofborg/src/tasks/pastebin_collector.rs b/ofborg/src/tasks/pastebin_collector.rs index 0d102c6..b041a63 100644 --- a/ofborg/src/tasks/pastebin_collector.rs +++ b/ofborg/src/tasks/pastebin_collector.rs @@ -47,7 +47,7 @@ impl worker::SimpleWorker for PastebinCollector { } } - fn consumer(&mut self, job: &Self::J) -> worker::Actions { + fn consumer(&mut self, _chan: &mut lapin::Channel, job: &Self::J) -> worker::Actions { let span = debug_span!("pastebin", title = ?job.title); let _enter = span.enter(); diff --git a/ofborg/src/tasks/statscollector.rs b/ofborg/src/tasks/statscollector.rs index c28707b..be3458c 100644 --- a/ofborg/src/tasks/statscollector.rs +++ b/ofborg/src/tasks/statscollector.rs @@ -49,7 +49,11 @@ impl worker::SimpleWorker for StatCollectorWorker } } - fn consumer(&mut self, job: &stats::EventMessage) -> worker::Actions { + fn consumer( + &mut self, + _chan: &mut lapin::Channel, + job: &stats::EventMessage, + ) -> worker::Actions { let sender = job.sender.clone(); for event in job.events.iter() { self.collector.record(sender.clone(), event.clone()); diff --git a/ofborg/src/utils/pastebin.rs b/ofborg/src/utils/pastebin.rs index b63c8a4..8c6d193 100644 --- a/ofborg/src/utils/pastebin.rs +++ b/ofborg/src/utils/pastebin.rs @@ -3,6 +3,9 @@ use std::path::PathBuf; use base64::{prelude::BASE64_URL_SAFE_NO_PAD, Engine}; +use lapin::Channel; + +use crate::easyamqp::ChannelExt; #[derive(Serialize, Deserialize, Debug)] pub struct Pastebin { @@ -31,14 +34,16 @@ impl From<&Pastebin> for PersistedPastebin { } } -pub fn make_pastebin(title: &str, contents: String) -> Option { +pub async fn make_pastebin( + chan: &mut Channel, + title: &str, + contents: String, +) -> Result { let pastebin = Pastebin { title: title.to_owned(), contents, }; - // wait for a reply? - //async_std::task::block_on(publish_serde_action(pastebin)); - - None + chan.send_request(Some("pastebin-log"), None, &pastebin) + .await } diff --git a/ofborg/src/worker.rs b/ofborg/src/worker.rs index ec63dc7..122103c 100644 --- a/ofborg/src/worker.rs +++ b/ofborg/src/worker.rs @@ -24,6 +24,24 @@ pub struct QueueMsg { pub content: Vec, } +pub fn prepare_queue_message( + exchange: Option<&str>, + routing_key: Option<&str>, + msg: &T, +) -> QueueMsg +where + T: Serialize, +{ + QueueMsg { + exchange: exchange.map(|s| s.to_owned()), + routing_key: routing_key.map(|s| s.to_owned()), + mandatory: false, + immediate: false, + content_type: Some("application/json".to_owned()), + content: serde_json::to_string(&msg).unwrap().into_bytes(), + } +} + pub fn publish_serde_action( exchange: Option, routing_key: Option, @@ -32,20 +50,16 @@ pub fn publish_serde_action( where T: Serialize, { - Action::Publish(Box::new(QueueMsg { - exchange, - routing_key, - mandatory: false, - immediate: false, - content_type: Some("application/json".to_owned()), - content: serde_json::to_string(&msg).unwrap().into_bytes(), - })) + Action::Publish(Box::new(prepare_queue_message( + exchange.as_deref(), + routing_key.as_deref(), + msg, + ))) } pub trait SimpleWorker: Send { type J: Send; - - fn consumer(&mut self, job: &Self::J) -> Actions; + fn consumer(&mut self, chan: &mut lapin::Channel, job: &Self::J) -> Actions; fn msg_to_job( &mut self,