set content_type for lapin

This commit is contained in:
Daiderd Jordan 2020-04-29 22:25:33 +02:00
parent 7dc8d407a2
commit 8dfcccb93e
No known key found for this signature in database
GPG key ID: D02435D05B810C96
3 changed files with 12 additions and 13 deletions

View file

@ -85,9 +85,10 @@ impl<W: SimpleWorker + 'static> ConsumerExt<W> for CloseOnDrop<Channel> {
))?;
Ok(Box::pin(async move {
while let Some(Ok(deliver)) = consumer.next().await {
let content_type = deliver.properties.content_type();
let job = worker.msg_to_job(
deliver.routing_key.as_str(),
&None, // TODO content type
&content_type.as_ref().map(|s| s.to_string()),
&deliver.data,
);
@ -134,9 +135,10 @@ impl<W: SimpleNotifyWorker + 'static> ConsumerExt<W> for NotifyChannel {
deliver: &deliver,
};
let content_type = deliver.properties.content_type();
let job = worker.msg_to_job(
deliver.routing_key.as_str(),
&None, // TODO content type
&content_type.as_ref().map(|s| s.to_string()),
&deliver.data,
);

View file

@ -80,9 +80,10 @@ impl<'a> NotificationReceiver for ChannelNotificationReceiver<'a> {
let exch = msg.exchange.take().unwrap_or_else(|| "".to_owned());
let key = msg.routing_key.take().unwrap_or_else(|| "".to_owned());
let props = msg.properties.take().unwrap_or(BasicProperties {
let props = BasicProperties {
content_type: msg.content_type,
..Default::default()
});
};
self.channel
.basic_publish(exch, key, msg.mandatory, msg.immediate, props, msg.content)
.unwrap();

View file

@ -26,7 +26,7 @@ pub struct QueueMsg {
pub routing_key: Option<String>,
pub mandatory: bool,
pub immediate: bool,
pub properties: Option<BasicProperties>,
pub content_type: Option<String>,
pub content: Vec<u8>,
}
@ -38,17 +38,12 @@ pub fn publish_serde_action<T: ?Sized>(
where
T: Serialize,
{
let props = BasicProperties {
content_type: Some("application/json".to_owned()),
..Default::default()
};
Action::Publish(Box::new(QueueMsg {
exchange,
routing_key,
mandatory: false,
immediate: false,
properties: Some(props),
content_type: Some("application/json".to_owned()),
content: serde_json::to_string(&msg).unwrap().into_bytes(),
}))
}
@ -107,9 +102,10 @@ impl<T: SimpleWorker + Send> amqp::Consumer for Worker<T> {
let exch = msg.exchange.take().unwrap_or_else(|| "".to_owned());
let key = msg.routing_key.take().unwrap_or_else(|| "".to_owned());
let props = msg.properties.take().unwrap_or(BasicProperties {
let props = BasicProperties {
content_type: msg.content_type,
..Default::default()
});
};
channel
.basic_publish(exch, key, msg.mandatory, msg.immediate, props, msg.content)
.unwrap();