generalize SimpleNotifyWorker trait

This commit is contained in:
Daiderd Jordan 2020-04-28 20:18:21 +02:00
parent 3722158a05
commit 5a75be23ca
No known key found for this signature in database
GPG key ID: D02435D05B810C96
2 changed files with 15 additions and 10 deletions

View file

@ -1,7 +1,7 @@
use crate::worker::Action; use crate::worker::Action;
use amqp::protocol::basic::{BasicProperties, Deliver}; use amqp::protocol::basic::{BasicProperties, Deliver};
use amqp::{Basic, Channel, Consumer}; use amqp::Basic;
use std::marker::Send; use std::marker::Send;
@ -16,8 +16,8 @@ pub trait SimpleNotifyWorker {
fn msg_to_job( fn msg_to_job(
&self, &self,
method: &Deliver, routing_key: &str,
headers: &BasicProperties, content_type: &Option<String>,
body: &[u8], body: &[u8],
) -> Result<Self::J, String>; ) -> Result<Self::J, String>;
} }
@ -44,12 +44,15 @@ impl NotificationReceiver for DummyNotificationReceiver {
} }
pub struct ChannelNotificationReceiver<'a> { pub struct ChannelNotificationReceiver<'a> {
channel: &'a mut Channel, channel: &'a mut amqp::Channel,
delivery_tag: u64, delivery_tag: u64,
} }
impl<'a> ChannelNotificationReceiver<'a> { impl<'a> ChannelNotificationReceiver<'a> {
pub fn new(channel: &'a mut Channel, delivery_tag: u64) -> ChannelNotificationReceiver<'a> { pub fn new(
channel: &'a mut amqp::Channel,
delivery_tag: u64,
) -> ChannelNotificationReceiver<'a> {
ChannelNotificationReceiver { ChannelNotificationReceiver {
channel, channel,
delivery_tag, delivery_tag,
@ -92,17 +95,20 @@ pub fn new<T: SimpleNotifyWorker>(worker: T) -> NotifyWorker<T> {
NotifyWorker { internal: worker } NotifyWorker { internal: worker }
} }
impl<T: SimpleNotifyWorker + Send> Consumer for NotifyWorker<T> { impl<T: SimpleNotifyWorker + Send> amqp::Consumer for NotifyWorker<T> {
fn handle_delivery( fn handle_delivery(
&mut self, &mut self,
channel: &mut Channel, channel: &mut amqp::Channel,
method: Deliver, method: Deliver,
headers: BasicProperties, headers: BasicProperties,
body: Vec<u8>, body: Vec<u8>,
) { ) {
let mut receiver = ChannelNotificationReceiver::new(channel, method.delivery_tag); let mut receiver = ChannelNotificationReceiver::new(channel, method.delivery_tag);
let job = self.internal.msg_to_job(&method, &headers, &body).unwrap(); let job = self
.internal
.msg_to_job(&method.routing_key, &headers.content_type, &body)
.unwrap();
self.internal.consumer(&job, &mut receiver); self.internal.consumer(&job, &mut receiver);
} }
} }

View file

@ -6,7 +6,6 @@ use crate::nix;
use crate::notifyworker; use crate::notifyworker;
use crate::worker; use crate::worker;
use amqp::protocol::basic::{BasicProperties, Deliver};
use uuid::Uuid; use uuid::Uuid;
use std::collections::VecDeque; use std::collections::VecDeque;
@ -260,7 +259,7 @@ impl<'a, 'b> JobActions<'a, 'b> {
impl notifyworker::SimpleNotifyWorker for BuildWorker { impl notifyworker::SimpleNotifyWorker for BuildWorker {
type J = buildjob::BuildJob; type J = buildjob::BuildJob;
fn msg_to_job(&self, _: &Deliver, _: &BasicProperties, body: &[u8]) -> Result<Self::J, String> { fn msg_to_job(&self, _: &str, _: &Option<String>, body: &[u8]) -> Result<Self::J, String> {
info!("lmao I got a job?"); info!("lmao I got a job?");
match buildjob::from(body) { match buildjob::from(body) {
Ok(e) => Ok(e), Ok(e) => Ok(e),