feat: add pastebin internal RPC API

Extend SimpleWorker to take a AMQP channel.

Signed-off-by: Raito Bezarius <masterancpp@gmail.com>
This commit is contained in:
raito 2024-11-01 13:24:17 +01:00
parent 83a7f35b52
commit ae9e48630d
12 changed files with 184 additions and 56 deletions

View file

@ -1,3 +1,5 @@
use serde::{Deserialize, Serialize};
pub struct ConsumeConfig {
/// Specifies the name of the queue to consume from.
pub queue: String,
@ -263,6 +265,12 @@ pub trait ChannelExt {
fn declare_exchange(&mut self, config: ExchangeConfig) -> Result<(), Self::Error>;
fn declare_queue(&mut self, config: QueueConfig) -> Result<(), Self::Error>;
fn bind_queue(&mut self, config: BindQueueConfig) -> Result<(), Self::Error>;
fn send_request<T: ?Sized + Serialize + std::marker::Sync, U: for<'a> Deserialize<'a>>(
&mut self,
exchange: Option<&str>,
routing_key: Option<&str>,
msg: &T,
) -> impl std::future::Future<Output = Result<U, Self::Error>> + Send;
}
pub trait ConsumerExt<'a, C> {

View file

@ -7,7 +7,7 @@ use crate::easyamqp::{
};
use crate::notifyworker::{NotificationReceiver, SimpleNotifyWorker};
use crate::ofborg;
use crate::worker::{Action, SimpleWorker};
use crate::worker::{prepare_queue_message, Action, SimpleWorker};
use async_std::future::Future;
use async_std::stream::StreamExt;
@ -17,8 +17,9 @@ use lapin::options::{
BasicAckOptions, BasicConsumeOptions, BasicNackOptions, BasicPublishOptions, BasicQosOptions,
ExchangeDeclareOptions, QueueBindOptions, QueueDeclareOptions,
};
use lapin::types::{AMQPValue, FieldTable};
use lapin::types::{AMQPValue, FieldTable, ShortString};
use lapin::{BasicProperties, Channel, Connection, ConnectionProperties, ExchangeKind};
use serde::{Deserialize, Serialize};
use tracing::{debug, trace};
pub fn from_config(cfg: &RabbitMqConfig) -> Result<Connection, lapin::Error> {
@ -82,13 +83,71 @@ impl ChannelExt for Channel {
))?;
Ok(())
}
async fn send_request<T: ?Sized + Serialize, U: for<'a> Deserialize<'a>>(
&mut self,
exchange: Option<&str>,
routing_key: Option<&str>,
msg: &T,
) -> Result<U, Self::Error> {
let mut msg = prepare_queue_message(exchange, routing_key, msg);
let correlation_id: ShortString = format!("{}", uuid::Uuid::new_v4().as_simple()).into();
let mut props = BasicProperties::default()
.with_reply_to("amq.rabbitmq.reply-to".into())
.with_correlation_id(correlation_id.clone())
.with_delivery_mode(2); // do not lose pastebins please
if let Some(s) = msg.content_type {
props = props.with_content_type(s.into());
}
trace!(?exchange, ?routing_key, "sending a RPC request");
let confirmation = self
.basic_publish(
&msg.exchange.take().unwrap_or_else(|| "".to_owned()),
&msg.routing_key.take().unwrap_or_else(|| "".to_owned()),
BasicPublishOptions::default(),
&msg.content,
props,
)
.await?
.await?;
trace!(?confirmation, "RPC request sent");
let mut consumer = self
.basic_consume(
"amq.rabbitmq.reply-to".into(),
"whoami",
BasicConsumeOptions::default(),
FieldTable::default(),
)
.await?;
while let Some(Ok(deliver)) = consumer.next().await {
debug!(?deliver.delivery_tag, "received an RPC reply");
if let Some(deliver_correlation_id) = deliver.properties.correlation_id().as_ref() {
if deliver_correlation_id == &correlation_id {
trace!(?deliver_correlation_id, "received the expected RPC reply");
return Ok(serde_json::from_slice(&deliver.data).unwrap());
}
}
}
panic!("did not receive an RPC reply");
}
}
impl<'a, W: SimpleWorker + 'a> ConsumerExt<'a, W> for Channel {
type Error = lapin::Error;
type Handle = Pin<Box<dyn Future<Output = ()> + 'a>>;
fn consume(self, mut worker: W, config: ConsumeConfig) -> Result<Self::Handle, Self::Error> {
fn consume(
mut self,
mut worker: W,
config: ConsumeConfig,
) -> Result<Self::Handle, Self::Error> {
let mut consumer = task::block_on(self.basic_consume(
&config.queue,
&config.consumer_tag,
@ -107,7 +166,7 @@ impl<'a, W: SimpleWorker + 'a> ConsumerExt<'a, W> for Channel {
)
.expect("worker unexpected message consumed");
for action in worker.consumer(&job) {
for action in worker.consumer(&mut self, &job) {
action_deliver(&self, &deliver, action)
.await
.expect("action deliver failure");

View file

@ -47,6 +47,7 @@ fn label_from_title(title: &str) -> Vec<String> {
}
pub struct NixpkgsStrategy<'a> {
chan: lapin::Channel,
job: &'a EvaluationJob,
pull: &'a hubcaps::pulls::PullRequest,
issue: &'a Issue,
@ -62,6 +63,7 @@ pub struct NixpkgsStrategy<'a> {
impl<'a> NixpkgsStrategy<'a> {
#[allow(clippy::too_many_arguments)]
pub fn new(
chan: lapin::Channel,
job: &'a EvaluationJob,
pull: &'a hubcaps::pulls::PullRequest,
issue: &'a Issue,
@ -70,6 +72,7 @@ impl<'a> NixpkgsStrategy<'a> {
nix: Nix,
) -> NixpkgsStrategy<'a> {
Self {
chan,
job,
pull,
issue,
@ -205,7 +208,7 @@ impl<'a> NixpkgsStrategy<'a> {
}
fn update_rebuild_labels(
&self,
&mut self,
dir: &Path,
overall_status: &mut CommitStatus,
) -> Result<(), Error> {
@ -230,19 +233,25 @@ impl<'a> NixpkgsStrategy<'a> {
Ok(())
}
fn gist_changed_paths(&self, attrs: &[PackageArch]) -> Option<String> {
crate::utils::pastebin::make_pastebin(
fn gist_changed_paths(&mut self, attrs: &[PackageArch]) -> Option<String> {
async_std::task::block_on(crate::utils::pastebin::make_pastebin(
&mut self.chan,
"Changed Paths",
attrs
.iter()
.map(|attr| format!("{}\t{}", &attr.architecture, &attr.package))
.collect::<Vec<String>>()
.join("\n"),
)
))
.ok()
.map(|pp| pp.uri)
}
fn record_impacted_maintainers(&self, dir: &Path, attrs: &[PackageArch]) -> Result<(), Error> {
fn record_impacted_maintainers(
&mut self,
dir: &Path,
attrs: &[PackageArch],
) -> Result<(), Error> {
let changed_attributes = attrs
.iter()
.map(|attr| attr.package.split('.').collect::<Vec<&str>>())
@ -252,13 +261,15 @@ impl<'a> NixpkgsStrategy<'a> {
let m =
ImpactedMaintainers::calculate(&self.nix, dir, changed_paths, &changed_attributes);
let gist_url = crate::utils::pastebin::make_pastebin(
let gist_url = async_std::task::block_on(crate::utils::pastebin::make_pastebin(
&mut self.chan,
"Potential Maintainers",
match m {
Ok(ref maintainers) => format!("Maintainers:\n{}", maintainers),
Err(ref e) => format!("Ignorable calculation error:\n{:?}", e),
},
)
))
.ok()
.map(|pp| pp.uri);
let prefix = get_prefix(self.repo.statuses(), &self.job.pr.head_sha)?;
@ -304,7 +315,7 @@ impl<'a> NixpkgsStrategy<'a> {
Ok(())
}
fn check_meta_queue_builds(&self, dir: &Path) -> StepResult<Vec<BuildJob>> {
fn check_meta_queue_builds(&mut self, dir: &Path) -> StepResult<Vec<BuildJob>> {
if let Some(ref possibly_touched_packages) = self.touched_packages {
let prefix = get_prefix(self.repo.statuses(), &self.job.pr.head_sha)?;
@ -352,8 +363,13 @@ impl<'a> NixpkgsStrategy<'a> {
}
Err(out) => {
status.set_url(
crate::utils::pastebin::make_pastebin("Meta Check", out.display())
.map(|pp| pp.uri),
async_std::task::block_on(crate::utils::pastebin::make_pastebin(
&mut self.chan,
"Meta Check",
out.display(),
))
.ok()
.map(|pp| pp.uri),
);
status.set(hubcaps::statuses::State::Failure)?;
Err(Error::Fail(String::from(

View file

@ -73,7 +73,11 @@ impl<E: stats::SysEvents + 'static> worker::SimpleWorker for EvaluationWorker<E>
}
}
fn consumer(&mut self, job: &evaluationjob::EvaluationJob) -> worker::Actions {
fn consumer(
&mut self,
chan: &mut lapin::Channel,
job: &evaluationjob::EvaluationJob,
) -> worker::Actions {
let span = debug_span!("job", pr = ?job.pr.number);
let _enter = span.enter();
@ -95,7 +99,7 @@ impl<E: stats::SysEvents + 'static> worker::SimpleWorker for EvaluationWorker<E>
&self.cloner,
job,
)
.worker_actions()
.worker_actions(chan)
}
}
@ -180,26 +184,31 @@ impl<'a, E: stats::SysEvents + 'static> OneEval<'a, E> {
)
}
fn make_pastebin(&self, title: &str, contents: String) -> Option<PersistedPastebin> {
crate::utils::pastebin::make_pastebin(title, contents)
fn make_pastebin(
&mut self,
chan: &mut lapin::Channel,
title: &str,
contents: String,
) -> Option<PersistedPastebin> {
async_std::task::block_on(crate::utils::pastebin::make_pastebin(chan, title, contents)).ok()
}
fn worker_actions(&mut self) -> worker::Actions {
let eval_result = self.evaluate_job().map_err(|eval_error| match eval_error {
// Handle error cases which expect us to post statuses
// to github. Convert Eval Errors in to Result<_, CommitStatusWrite>
EvalWorkerError::EvalError(eval::Error::Fail(msg)) => {
self.update_status(msg, None, hubcaps::statuses::State::Failure)
}
EvalWorkerError::EvalError(eval::Error::FailWithPastebin(msg, title, content)) => self
.update_status(
msg,
self.make_pastebin(&title, content).map(|pp| pp.uri),
hubcaps::statuses::State::Failure,
),
EvalWorkerError::EvalError(eval::Error::CommitStatusWrite(e)) => Err(e),
EvalWorkerError::CommitStatusWrite(e) => Err(e),
});
fn worker_actions(&mut self, chan: &mut lapin::Channel) -> worker::Actions {
let eval_result = self
.evaluate_job(chan)
.map_err(|eval_error| match eval_error {
// Handle error cases which expect us to post statuses
// to github. Convert Eval Errors in to Result<_, CommitStatusWrite>
EvalWorkerError::EvalError(eval::Error::Fail(msg)) => {
self.update_status(msg, None, hubcaps::statuses::State::Failure)
}
EvalWorkerError::EvalError(eval::Error::FailWithPastebin(msg, title, content)) => {
let pastebin = self.make_pastebin(chan, &title, content).map(|pp| pp.uri);
self.update_status(msg, pastebin, hubcaps::statuses::State::Failure)
}
EvalWorkerError::EvalError(eval::Error::CommitStatusWrite(e)) => Err(e),
EvalWorkerError::CommitStatusWrite(e) => Err(e),
});
match eval_result {
Ok(eval_actions) => eval_actions,
@ -236,7 +245,10 @@ impl<'a, E: stats::SysEvents + 'static> OneEval<'a, E> {
// FIXME: remove with rust/cargo update
#[allow(clippy::cognitive_complexity)]
fn evaluate_job(&mut self) -> Result<worker::Actions, EvalWorkerError> {
fn evaluate_job(
&mut self,
chan: &mut lapin::Channel,
) -> Result<worker::Actions, EvalWorkerError> {
let job = self.job;
let repo = self
.client_app
@ -277,6 +289,7 @@ impl<'a, E: stats::SysEvents + 'static> OneEval<'a, E> {
let mut evaluation_strategy: Box<dyn eval::EvaluationStrategy> = if job.is_nixpkgs() {
Box::new(eval::NixpkgsStrategy::new(
chan.clone(),
job,
&pull,
&issue,
@ -408,6 +421,7 @@ impl<'a, E: stats::SysEvents + 'static> OneEval<'a, E> {
state = hubcaps::statuses::State::Failure;
gist_url = self
.make_pastebin(
chan,
&format!("[{}] Evaluation of {}", prefix, check.name()),
file_to_str(&mut out),
)

View file

@ -29,7 +29,11 @@ impl worker::SimpleWorker for EvaluationFilterWorker {
}
}
fn consumer(&mut self, job: &ghevent::PullRequestEvent) -> worker::Actions {
fn consumer(
&mut self,
_chan: &mut lapin::Channel,
job: &ghevent::PullRequestEvent,
) -> worker::Actions {
let span = debug_span!("job", pr = ?job.number);
let _enter = span.enter();

View file

@ -36,7 +36,11 @@ impl worker::SimpleWorker for GitHubCommentWorker {
// FIXME: remove with rust/cargo update
#[allow(clippy::cognitive_complexity)]
fn consumer(&mut self, job: &ghevent::IssueComment) -> worker::Actions {
fn consumer(
&mut self,
_chan: &mut lapin::Channel,
job: &ghevent::IssueComment,
) -> worker::Actions {
let span = debug_span!("job", pr = ?job.issue.number);
let _enter = span.enter();

View file

@ -46,7 +46,7 @@ impl worker::SimpleWorker for GitHubCommentPoster {
PostableEvent::from(body)
}
fn consumer(&mut self, job: &PostableEvent) -> worker::Actions {
fn consumer(&mut self, _chan: &mut lapin::Channel, job: &PostableEvent) -> worker::Actions {
let mut checks: Vec<CheckRunOptions> = vec![];
let repo: Repo;

View file

@ -210,7 +210,7 @@ impl worker::SimpleWorker for LogMessageCollector {
})
}
fn consumer(&mut self, job: &LogMessage) -> worker::Actions {
fn consumer(&mut self, _chan: &mut lapin::Channel, job: &LogMessage) -> worker::Actions {
match job.message {
MsgType::Start(ref start) => {
self.write_metadata(&job.from, start)

View file

@ -47,7 +47,7 @@ impl worker::SimpleWorker for PastebinCollector {
}
}
fn consumer(&mut self, job: &Self::J) -> worker::Actions {
fn consumer(&mut self, _chan: &mut lapin::Channel, job: &Self::J) -> worker::Actions {
let span = debug_span!("pastebin", title = ?job.title);
let _enter = span.enter();

View file

@ -49,7 +49,11 @@ impl<E: stats::SysEvents + 'static> worker::SimpleWorker for StatCollectorWorker
}
}
fn consumer(&mut self, job: &stats::EventMessage) -> worker::Actions {
fn consumer(
&mut self,
_chan: &mut lapin::Channel,
job: &stats::EventMessage,
) -> worker::Actions {
let sender = job.sender.clone();
for event in job.events.iter() {
self.collector.record(sender.clone(), event.clone());

View file

@ -3,6 +3,9 @@
use std::path::PathBuf;
use base64::{prelude::BASE64_URL_SAFE_NO_PAD, Engine};
use lapin::Channel;
use crate::easyamqp::ChannelExt;
#[derive(Serialize, Deserialize, Debug)]
pub struct Pastebin {
@ -31,14 +34,16 @@ impl From<&Pastebin> for PersistedPastebin {
}
}
pub fn make_pastebin(title: &str, contents: String) -> Option<PersistedPastebin> {
pub async fn make_pastebin(
chan: &mut Channel,
title: &str,
contents: String,
) -> Result<PersistedPastebin, lapin::Error> {
let pastebin = Pastebin {
title: title.to_owned(),
contents,
};
// wait for a reply?
//async_std::task::block_on(publish_serde_action(pastebin));
None
chan.send_request(Some("pastebin-log"), None, &pastebin)
.await
}

View file

@ -24,6 +24,24 @@ pub struct QueueMsg {
pub content: Vec<u8>,
}
pub fn prepare_queue_message<T: ?Sized>(
exchange: Option<&str>,
routing_key: Option<&str>,
msg: &T,
) -> QueueMsg
where
T: Serialize,
{
QueueMsg {
exchange: exchange.map(|s| s.to_owned()),
routing_key: routing_key.map(|s| s.to_owned()),
mandatory: false,
immediate: false,
content_type: Some("application/json".to_owned()),
content: serde_json::to_string(&msg).unwrap().into_bytes(),
}
}
pub fn publish_serde_action<T: ?Sized>(
exchange: Option<String>,
routing_key: Option<String>,
@ -32,20 +50,16 @@ pub fn publish_serde_action<T: ?Sized>(
where
T: Serialize,
{
Action::Publish(Box::new(QueueMsg {
exchange,
routing_key,
mandatory: false,
immediate: false,
content_type: Some("application/json".to_owned()),
content: serde_json::to_string(&msg).unwrap().into_bytes(),
}))
Action::Publish(Box::new(prepare_queue_message(
exchange.as_deref(),
routing_key.as_deref(),
msg,
)))
}
pub trait SimpleWorker: Send {
type J: Send;
fn consumer(&mut self, job: &Self::J) -> Actions;
fn consumer(&mut self, chan: &mut lapin::Channel, job: &Self::J) -> Actions;
fn msg_to_job(
&mut self,