diff --git a/ofborg/src/utils/pastebin.rs b/ofborg/src/utils/pastebin.rs index b63c8a4..4ac94d7 100644 --- a/ofborg/src/utils/pastebin.rs +++ b/ofborg/src/utils/pastebin.rs @@ -2,7 +2,16 @@ use std::path::PathBuf; +use async_std::stream::StreamExt; use base64::{prelude::BASE64_URL_SAFE_NO_PAD, Engine}; +use lapin::{ + options::{BasicConsumeOptions, BasicPublishOptions}, + types::{FieldTable, ShortString}, + BasicProperties, Channel, +}; +use tracing::{debug, trace}; + +use crate::worker::prepare_queue_message; #[derive(Serialize, Deserialize, Debug)] pub struct Pastebin { @@ -31,14 +40,61 @@ impl From<&Pastebin> for PersistedPastebin { } } -pub fn make_pastebin(title: &str, contents: String) -> Option { +pub async fn make_pastebin( + chan: &mut Channel, + title: &str, + contents: String, +) -> Result { let pastebin = Pastebin { title: title.to_owned(), contents, }; - // wait for a reply? - //async_std::task::block_on(publish_serde_action(pastebin)); + let mut msg = prepare_queue_message(Some("pastebin-log"), None, &pastebin); + // publish with a correlation ID and a reply-to the pastebin event queue + let correlation_id: ShortString = format!("{}", uuid::Uuid::new_v4().as_simple()).into(); + let mut props = BasicProperties::default() + .with_reply_to("persisted-pastebins".into()) + .with_correlation_id(correlation_id.clone()) + .with_delivery_mode(2); // do not lose pastebins please - None + if let Some(s) = msg.content_type { + props = props.with_content_type(s.into()); + } + + trace!(?title, "creating a pastebin"); + let confirmation = chan + .basic_publish( + &msg.exchange.take().unwrap_or_else(|| "".to_owned()), + &msg.routing_key.take().unwrap_or_else(|| "".to_owned()), + BasicPublishOptions::default(), + &msg.content, + props, + ) + .await? + .await?; + + trace!(?confirmation, "pastebin created"); + let mut consumer = chan + .basic_consume( + "persisted-pastebins", + "adhoc-pastebin", + BasicConsumeOptions::default(), + FieldTable::default(), + ) + .await?; + + let mut persisted_pastebin: Option = None; + // TODO: add timeout here. + // FIXME: if we timeout, increase the number of errors for stats? + while let Some(Ok(deliver)) = consumer.next().await { + debug!(?deliver.delivery_tag, "received a persisted pastebin"); + if deliver.properties.correlation_id().as_ref().unwrap() == &correlation_id { + persisted_pastebin = Some(serde_json::from_slice(&deliver.data).unwrap()); + break; + } + } + + // return the persisted pastebin + persisted_pastebin.ok_or_else(|| lapin::Error::InvalidChannel(1)) } diff --git a/ofborg/src/worker.rs b/ofborg/src/worker.rs index ec63dc7..818134b 100644 --- a/ofborg/src/worker.rs +++ b/ofborg/src/worker.rs @@ -24,6 +24,24 @@ pub struct QueueMsg { pub content: Vec, } +pub fn prepare_queue_message( + exchange: Option<&str>, + routing_key: Option<&str>, + msg: &T, +) -> QueueMsg +where + T: Serialize, +{ + QueueMsg { + exchange: exchange.map(|s| s.to_owned()), + routing_key: routing_key.map(|s| s.to_owned()), + mandatory: false, + immediate: false, + content_type: Some("application/json".to_owned()), + content: serde_json::to_string(&msg).unwrap().into_bytes(), + } +} + pub fn publish_serde_action( exchange: Option, routing_key: Option, @@ -32,14 +50,11 @@ pub fn publish_serde_action( where T: Serialize, { - Action::Publish(Box::new(QueueMsg { - exchange, - routing_key, - mandatory: false, - immediate: false, - content_type: Some("application/json".to_owned()), - content: serde_json::to_string(&msg).unwrap().into_bytes(), - })) + Action::Publish(Box::new(prepare_queue_message( + exchange.as_deref(), + routing_key.as_deref(), + msg, + ))) } pub trait SimpleWorker: Send {