feat: add pastebin internal RPC API

Signed-off-by: Raito Bezarius <masterancpp@gmail.com>
This commit is contained in:
raito 2024-11-01 13:24:17 +01:00
parent 4dd5802501
commit 050c96ea83
2 changed files with 83 additions and 12 deletions

View file

@ -2,7 +2,16 @@
use std::path::PathBuf;
use async_std::stream::StreamExt;
use base64::{prelude::BASE64_URL_SAFE_NO_PAD, Engine};
use lapin::{
options::{BasicConsumeOptions, BasicPublishOptions},
types::{FieldTable, ShortString},
BasicProperties, Channel,
};
use tracing::{debug, trace};
use crate::worker::prepare_queue_message;
#[derive(Serialize, Deserialize, Debug)]
pub struct Pastebin {
@ -31,14 +40,61 @@ impl From<&Pastebin> for PersistedPastebin {
}
}
pub fn make_pastebin(title: &str, contents: String) -> Option<PersistedPastebin> {
pub async fn make_pastebin(
chan: &mut Channel,
title: &str,
contents: String,
) -> Result<PersistedPastebin, lapin::Error> {
let pastebin = Pastebin {
title: title.to_owned(),
contents,
};
// wait for a reply?
//async_std::task::block_on(publish_serde_action(pastebin));
let mut msg = prepare_queue_message(Some("pastebin-log"), None, &pastebin);
// publish with a correlation ID and a reply-to the pastebin event queue
let correlation_id: ShortString = format!("{}", uuid::Uuid::new_v4().as_simple()).into();
let mut props = BasicProperties::default()
.with_reply_to("persisted-pastebins".into())
.with_correlation_id(correlation_id.clone())
.with_delivery_mode(2); // do not lose pastebins please
None
if let Some(s) = msg.content_type {
props = props.with_content_type(s.into());
}
trace!(?title, "creating a pastebin");
let confirmation = chan
.basic_publish(
&msg.exchange.take().unwrap_or_else(|| "".to_owned()),
&msg.routing_key.take().unwrap_or_else(|| "".to_owned()),
BasicPublishOptions::default(),
&msg.content,
props,
)
.await?
.await?;
trace!(?confirmation, "pastebin created");
let mut consumer = chan
.basic_consume(
"persisted-pastebins",
"adhoc-pastebin",
BasicConsumeOptions::default(),
FieldTable::default(),
)
.await?;
let mut persisted_pastebin: Option<PersistedPastebin> = None;
// TODO: add timeout here.
// FIXME: if we timeout, increase the number of errors for stats?
while let Some(Ok(deliver)) = consumer.next().await {
debug!(?deliver.delivery_tag, "received a persisted pastebin");
if deliver.properties.correlation_id().as_ref().unwrap() == &correlation_id {
persisted_pastebin = Some(serde_json::from_slice(&deliver.data).unwrap());
break;
}
}
// return the persisted pastebin
persisted_pastebin.ok_or_else(|| lapin::Error::InvalidChannel(1))
}

View file

@ -24,6 +24,24 @@ pub struct QueueMsg {
pub content: Vec<u8>,
}
pub fn prepare_queue_message<T: ?Sized>(
exchange: Option<&str>,
routing_key: Option<&str>,
msg: &T,
) -> QueueMsg
where
T: Serialize,
{
QueueMsg {
exchange: exchange.map(|s| s.to_owned()),
routing_key: routing_key.map(|s| s.to_owned()),
mandatory: false,
immediate: false,
content_type: Some("application/json".to_owned()),
content: serde_json::to_string(&msg).unwrap().into_bytes(),
}
}
pub fn publish_serde_action<T: ?Sized>(
exchange: Option<String>,
routing_key: Option<String>,
@ -32,14 +50,11 @@ pub fn publish_serde_action<T: ?Sized>(
where
T: Serialize,
{
Action::Publish(Box::new(QueueMsg {
exchange,
routing_key,
mandatory: false,
immediate: false,
content_type: Some("application/json".to_owned()),
content: serde_json::to_string(&msg).unwrap().into_bytes(),
}))
Action::Publish(Box::new(prepare_queue_message(
exchange.as_deref(),
routing_key.as_deref(),
msg,
)))
}
pub trait SimpleWorker: Send {