feat: add pastebin internal RPC API

Signed-off-by: Raito Bezarius <masterancpp@gmail.com>
This commit is contained in:
raito 2024-11-01 13:24:17 +01:00
parent b4f5dbe1b4
commit 326cf58e14
4 changed files with 98 additions and 15 deletions

View file

@ -1,3 +1,5 @@
use serde::{Deserialize, Serialize};
pub struct ConsumeConfig {
/// Specifies the name of the queue to consume from.
pub queue: String,
@ -263,6 +265,12 @@ pub trait ChannelExt {
fn declare_exchange(&mut self, config: ExchangeConfig) -> Result<(), Self::Error>;
fn declare_queue(&mut self, config: QueueConfig) -> Result<(), Self::Error>;
fn bind_queue(&mut self, config: BindQueueConfig) -> Result<(), Self::Error>;
async fn send_request<T: ?Sized + Serialize, U: for<'a> Deserialize<'a>>(
&mut self,
exchange: Option<&str>,
routing_key: Option<&str>,
msg: &T,
) -> Result<U, Self::Error>;
}
pub trait ConsumerExt<'a, C> {

View file

@ -7,7 +7,7 @@ use crate::easyamqp::{
};
use crate::notifyworker::{NotificationReceiver, SimpleNotifyWorker};
use crate::ofborg;
use crate::worker::{Action, SimpleWorker};
use crate::worker::{prepare_queue_message, Action, SimpleWorker};
use async_std::future::Future;
use async_std::stream::StreamExt;
@ -17,8 +17,9 @@ use lapin::options::{
BasicAckOptions, BasicConsumeOptions, BasicNackOptions, BasicPublishOptions, BasicQosOptions,
ExchangeDeclareOptions, QueueBindOptions, QueueDeclareOptions,
};
use lapin::types::{AMQPValue, FieldTable};
use lapin::types::{AMQPValue, FieldTable, ShortString};
use lapin::{BasicProperties, Channel, Connection, ConnectionProperties, ExchangeKind};
use serde::{Deserialize, Serialize};
use tracing::{debug, trace};
pub fn from_config(cfg: &RabbitMqConfig) -> Result<Connection, lapin::Error> {
@ -82,6 +83,60 @@ impl ChannelExt for Channel {
))?;
Ok(())
}
async fn send_request<T: ?Sized + Serialize, U: for<'a> Deserialize<'a>>(
&mut self,
exchange: Option<&str>,
routing_key: Option<&str>,
msg: &T,
) -> Result<U, Self::Error> {
let mut msg = prepare_queue_message(exchange, routing_key, msg);
let correlation_id: ShortString = format!("{}", uuid::Uuid::new_v4().as_simple()).into();
let mut props = BasicProperties::default()
.with_reply_to("amq.rabbitmq.reply-to".into())
.with_correlation_id(correlation_id.clone())
.with_delivery_mode(2); // do not lose pastebins please
if let Some(s) = msg.content_type {
props = props.with_content_type(s.into());
}
trace!(?exchange, ?routing_key, "sending a RPC request");
let confirmation = self
.basic_publish(
&msg.exchange.take().unwrap_or_else(|| "".to_owned()),
&msg.routing_key.take().unwrap_or_else(|| "".to_owned()),
BasicPublishOptions::default(),
&msg.content,
props,
)
.await?
.await?;
trace!(?confirmation, "RPC request sent");
let mut consumer = self
.basic_consume(
"amq.rabbitmq.reply-to".into(),
"whoami",
BasicConsumeOptions::default(),
FieldTable::default(),
)
.await?;
while let Some(Ok(deliver)) = consumer.next().await {
debug!(?deliver.delivery_tag, "received an RPC reply");
if let Some(deliver_correlation_id) = deliver.properties.correlation_id().as_ref() {
if deliver_correlation_id == &correlation_id {
trace!(?deliver_correlation_id, "received the expected RPC reply");
return Ok(serde_json::from_slice(&deliver.data).unwrap());
}
}
}
panic!("did not receive an RPC reply");
}
}
impl<'a, W: SimpleWorker + 'a> ConsumerExt<'a, W> for Channel {

View file

@ -3,6 +3,9 @@
use std::path::PathBuf;
use base64::{prelude::BASE64_URL_SAFE_NO_PAD, Engine};
use lapin::Channel;
use crate::easyamqp::ChannelExt;
#[derive(Serialize, Deserialize, Debug)]
pub struct Pastebin {
@ -31,14 +34,16 @@ impl From<&Pastebin> for PersistedPastebin {
}
}
pub fn make_pastebin(title: &str, contents: String) -> Option<PersistedPastebin> {
pub async fn make_pastebin(
chan: &mut Channel,
title: &str,
contents: String,
) -> Result<PersistedPastebin, lapin::Error> {
let pastebin = Pastebin {
title: title.to_owned(),
contents,
};
// wait for a reply?
//async_std::task::block_on(publish_serde_action(pastebin));
None
chan.send_request(Some("pastebin-log"), None, &pastebin)
.await
}

View file

@ -24,6 +24,24 @@ pub struct QueueMsg {
pub content: Vec<u8>,
}
pub fn prepare_queue_message<T: ?Sized>(
exchange: Option<&str>,
routing_key: Option<&str>,
msg: &T,
) -> QueueMsg
where
T: Serialize,
{
QueueMsg {
exchange: exchange.map(|s| s.to_owned()),
routing_key: routing_key.map(|s| s.to_owned()),
mandatory: false,
immediate: false,
content_type: Some("application/json".to_owned()),
content: serde_json::to_string(&msg).unwrap().into_bytes(),
}
}
pub fn publish_serde_action<T: ?Sized>(
exchange: Option<String>,
routing_key: Option<String>,
@ -32,14 +50,11 @@ pub fn publish_serde_action<T: ?Sized>(
where
T: Serialize,
{
Action::Publish(Box::new(QueueMsg {
exchange,
routing_key,
mandatory: false,
immediate: false,
content_type: Some("application/json".to_owned()),
content: serde_json::to_string(&msg).unwrap().into_bytes(),
}))
Action::Publish(Box::new(prepare_queue_message(
exchange.as_deref(),
routing_key.as_deref(),
msg,
)))
}
pub trait SimpleWorker: Send {