From 326cf58e148370b9991117c8125fc4f4fbd55cab Mon Sep 17 00:00:00 2001 From: Raito Bezarius Date: Fri, 1 Nov 2024 13:24:17 +0100 Subject: [PATCH] feat: add pastebin internal RPC API Signed-off-by: Raito Bezarius --- ofborg/src/easyamqp.rs | 8 +++++ ofborg/src/easylapin.rs | 59 ++++++++++++++++++++++++++++++++++-- ofborg/src/utils/pastebin.rs | 15 ++++++--- ofborg/src/worker.rs | 31 ++++++++++++++----- 4 files changed, 98 insertions(+), 15 deletions(-) diff --git a/ofborg/src/easyamqp.rs b/ofborg/src/easyamqp.rs index ccdba9f..eb83d9d 100644 --- a/ofborg/src/easyamqp.rs +++ b/ofborg/src/easyamqp.rs @@ -1,3 +1,5 @@ +use serde::{Deserialize, Serialize}; + pub struct ConsumeConfig { /// Specifies the name of the queue to consume from. pub queue: String, @@ -263,6 +265,12 @@ pub trait ChannelExt { fn declare_exchange(&mut self, config: ExchangeConfig) -> Result<(), Self::Error>; fn declare_queue(&mut self, config: QueueConfig) -> Result<(), Self::Error>; fn bind_queue(&mut self, config: BindQueueConfig) -> Result<(), Self::Error>; + async fn send_request Deserialize<'a>>( + &mut self, + exchange: Option<&str>, + routing_key: Option<&str>, + msg: &T, + ) -> Result; } pub trait ConsumerExt<'a, C> { diff --git a/ofborg/src/easylapin.rs b/ofborg/src/easylapin.rs index 5be7278..2008f29 100644 --- a/ofborg/src/easylapin.rs +++ b/ofborg/src/easylapin.rs @@ -7,7 +7,7 @@ use crate::easyamqp::{ }; use crate::notifyworker::{NotificationReceiver, SimpleNotifyWorker}; use crate::ofborg; -use crate::worker::{Action, SimpleWorker}; +use crate::worker::{prepare_queue_message, Action, SimpleWorker}; use async_std::future::Future; use async_std::stream::StreamExt; @@ -17,8 +17,9 @@ use lapin::options::{ BasicAckOptions, BasicConsumeOptions, BasicNackOptions, BasicPublishOptions, BasicQosOptions, ExchangeDeclareOptions, QueueBindOptions, QueueDeclareOptions, }; -use lapin::types::{AMQPValue, FieldTable}; +use lapin::types::{AMQPValue, FieldTable, ShortString}; use lapin::{BasicProperties, Channel, Connection, ConnectionProperties, ExchangeKind}; +use serde::{Deserialize, Serialize}; use tracing::{debug, trace}; pub fn from_config(cfg: &RabbitMqConfig) -> Result { @@ -82,6 +83,60 @@ impl ChannelExt for Channel { ))?; Ok(()) } + + async fn send_request Deserialize<'a>>( + &mut self, + exchange: Option<&str>, + routing_key: Option<&str>, + msg: &T, + ) -> Result { + let mut msg = prepare_queue_message(exchange, routing_key, msg); + + let correlation_id: ShortString = format!("{}", uuid::Uuid::new_v4().as_simple()).into(); + let mut props = BasicProperties::default() + .with_reply_to("amq.rabbitmq.reply-to".into()) + .with_correlation_id(correlation_id.clone()) + .with_delivery_mode(2); // do not lose pastebins please + + if let Some(s) = msg.content_type { + props = props.with_content_type(s.into()); + } + + trace!(?exchange, ?routing_key, "sending a RPC request"); + let confirmation = self + .basic_publish( + &msg.exchange.take().unwrap_or_else(|| "".to_owned()), + &msg.routing_key.take().unwrap_or_else(|| "".to_owned()), + BasicPublishOptions::default(), + &msg.content, + props, + ) + .await? + .await?; + + trace!(?confirmation, "RPC request sent"); + + let mut consumer = self + .basic_consume( + "amq.rabbitmq.reply-to".into(), + "whoami", + BasicConsumeOptions::default(), + FieldTable::default(), + ) + .await?; + + while let Some(Ok(deliver)) = consumer.next().await { + debug!(?deliver.delivery_tag, "received an RPC reply"); + if let Some(deliver_correlation_id) = deliver.properties.correlation_id().as_ref() { + if deliver_correlation_id == &correlation_id { + trace!(?deliver_correlation_id, "received the expected RPC reply"); + return Ok(serde_json::from_slice(&deliver.data).unwrap()); + } + } + } + + panic!("did not receive an RPC reply"); + } } impl<'a, W: SimpleWorker + 'a> ConsumerExt<'a, W> for Channel { diff --git a/ofborg/src/utils/pastebin.rs b/ofborg/src/utils/pastebin.rs index b63c8a4..8c6d193 100644 --- a/ofborg/src/utils/pastebin.rs +++ b/ofborg/src/utils/pastebin.rs @@ -3,6 +3,9 @@ use std::path::PathBuf; use base64::{prelude::BASE64_URL_SAFE_NO_PAD, Engine}; +use lapin::Channel; + +use crate::easyamqp::ChannelExt; #[derive(Serialize, Deserialize, Debug)] pub struct Pastebin { @@ -31,14 +34,16 @@ impl From<&Pastebin> for PersistedPastebin { } } -pub fn make_pastebin(title: &str, contents: String) -> Option { +pub async fn make_pastebin( + chan: &mut Channel, + title: &str, + contents: String, +) -> Result { let pastebin = Pastebin { title: title.to_owned(), contents, }; - // wait for a reply? - //async_std::task::block_on(publish_serde_action(pastebin)); - - None + chan.send_request(Some("pastebin-log"), None, &pastebin) + .await } diff --git a/ofborg/src/worker.rs b/ofborg/src/worker.rs index ec63dc7..818134b 100644 --- a/ofborg/src/worker.rs +++ b/ofborg/src/worker.rs @@ -24,6 +24,24 @@ pub struct QueueMsg { pub content: Vec, } +pub fn prepare_queue_message( + exchange: Option<&str>, + routing_key: Option<&str>, + msg: &T, +) -> QueueMsg +where + T: Serialize, +{ + QueueMsg { + exchange: exchange.map(|s| s.to_owned()), + routing_key: routing_key.map(|s| s.to_owned()), + mandatory: false, + immediate: false, + content_type: Some("application/json".to_owned()), + content: serde_json::to_string(&msg).unwrap().into_bytes(), + } +} + pub fn publish_serde_action( exchange: Option, routing_key: Option, @@ -32,14 +50,11 @@ pub fn publish_serde_action( where T: Serialize, { - Action::Publish(Box::new(QueueMsg { - exchange, - routing_key, - mandatory: false, - immediate: false, - content_type: Some("application/json".to_owned()), - content: serde_json::to_string(&msg).unwrap().into_bytes(), - })) + Action::Publish(Box::new(prepare_queue_message( + exchange.as_deref(), + routing_key.as_deref(), + msg, + ))) } pub trait SimpleWorker: Send {