From 8dfcccb93e247e1996f2983593ae6ce8b88e4e48 Mon Sep 17 00:00:00 2001 From: Daiderd Jordan Date: Wed, 29 Apr 2020 22:25:33 +0200 Subject: [PATCH] set content_type for lapin --- ofborg/src/easylapin.rs | 6 ++++-- ofborg/src/notifyworker.rs | 5 +++-- ofborg/src/worker.rs | 14 +++++--------- 3 files changed, 12 insertions(+), 13 deletions(-) diff --git a/ofborg/src/easylapin.rs b/ofborg/src/easylapin.rs index 4cbe852..f435ffa 100644 --- a/ofborg/src/easylapin.rs +++ b/ofborg/src/easylapin.rs @@ -85,9 +85,10 @@ impl ConsumerExt for CloseOnDrop { ))?; Ok(Box::pin(async move { while let Some(Ok(deliver)) = consumer.next().await { + let content_type = deliver.properties.content_type(); let job = worker.msg_to_job( deliver.routing_key.as_str(), - &None, // TODO content type + &content_type.as_ref().map(|s| s.to_string()), &deliver.data, ); @@ -134,9 +135,10 @@ impl ConsumerExt for NotifyChannel { deliver: &deliver, }; + let content_type = deliver.properties.content_type(); let job = worker.msg_to_job( deliver.routing_key.as_str(), - &None, // TODO content type + &content_type.as_ref().map(|s| s.to_string()), &deliver.data, ); diff --git a/ofborg/src/notifyworker.rs b/ofborg/src/notifyworker.rs index a86f851..aeb8842 100644 --- a/ofborg/src/notifyworker.rs +++ b/ofborg/src/notifyworker.rs @@ -80,9 +80,10 @@ impl<'a> NotificationReceiver for ChannelNotificationReceiver<'a> { let exch = msg.exchange.take().unwrap_or_else(|| "".to_owned()); let key = msg.routing_key.take().unwrap_or_else(|| "".to_owned()); - let props = msg.properties.take().unwrap_or(BasicProperties { + let props = BasicProperties { + content_type: msg.content_type, ..Default::default() - }); + }; self.channel .basic_publish(exch, key, msg.mandatory, msg.immediate, props, msg.content) .unwrap(); diff --git a/ofborg/src/worker.rs b/ofborg/src/worker.rs index 51c83c8..7aae8fa 100644 --- a/ofborg/src/worker.rs +++ b/ofborg/src/worker.rs @@ -26,7 +26,7 @@ pub struct QueueMsg { pub routing_key: Option, pub mandatory: bool, pub immediate: bool, - pub properties: Option, + pub content_type: Option, pub content: Vec, } @@ -38,17 +38,12 @@ pub fn publish_serde_action( where T: Serialize, { - let props = BasicProperties { - content_type: Some("application/json".to_owned()), - ..Default::default() - }; - Action::Publish(Box::new(QueueMsg { exchange, routing_key, mandatory: false, immediate: false, - properties: Some(props), + content_type: Some("application/json".to_owned()), content: serde_json::to_string(&msg).unwrap().into_bytes(), })) } @@ -107,9 +102,10 @@ impl amqp::Consumer for Worker { let exch = msg.exchange.take().unwrap_or_else(|| "".to_owned()); let key = msg.routing_key.take().unwrap_or_else(|| "".to_owned()); - let props = msg.properties.take().unwrap_or(BasicProperties { + let props = BasicProperties { + content_type: msg.content_type, ..Default::default() - }); + }; channel .basic_publish(exch, key, msg.mandatory, msg.immediate, props, msg.content) .unwrap();