generalize SimpleWorker trait

This commit is contained in:
Daiderd Jordan 2020-04-28 19:02:55 +02:00
parent ed4fcea227
commit 3722158a05
No known key found for this signature in database
GPG key ID: D02435D05B810C96
15 changed files with 32 additions and 56 deletions

View file

@ -81,7 +81,7 @@ fn main() {
}) })
.unwrap(); .unwrap();
channel let mut channel = channel
.consume( .consume(
notifyworker::new(tasks::build::BuildWorker::new( notifyworker::new(tasks::build::BuildWorker::new(
cloner, cloner,

View file

@ -62,7 +62,7 @@ fn main() {
.unwrap(); .unwrap();
channel.basic_prefetch(1).unwrap(); channel.basic_prefetch(1).unwrap();
channel let mut channel = channel
.consume( .consume(
worker::new(tasks::evaluationfilter::EvaluationFilterWorker::new( worker::new(tasks::evaluationfilter::EvaluationFilterWorker::new(
cfg.acl(), cfg.acl(),

View file

@ -62,7 +62,7 @@ fn main() {
.unwrap(); .unwrap();
channel.basic_prefetch(1).unwrap(); channel.basic_prefetch(1).unwrap();
channel let mut channel = channel
.consume( .consume(
worker::new(tasks::githubcommentfilter::GitHubCommentWorker::new( worker::new(tasks::githubcommentfilter::GitHubCommentWorker::new(
cfg.acl(), cfg.acl(),

View file

@ -47,7 +47,7 @@ fn main() {
.unwrap(); .unwrap();
channel.basic_prefetch(1).unwrap(); channel.basic_prefetch(1).unwrap();
channel let mut channel = channel
.consume( .consume(
worker::new(tasks::githubcommentposter::GitHubCommentPoster::new( worker::new(tasks::githubcommentposter::GitHubCommentPoster::new(
cfg.github_app_vendingmachine(), cfg.github_app_vendingmachine(),

View file

@ -49,7 +49,7 @@ fn main() {
}) })
.unwrap(); .unwrap();
channel let mut channel = channel
.consume( .consume(
worker::new(tasks::log_message_collector::LogMessageCollector::new( worker::new(tasks::log_message_collector::LogMessageCollector::new(
PathBuf::from(cfg.log_storage.clone().unwrap().path), PathBuf::from(cfg.log_storage.clone().unwrap().path),

View file

@ -65,7 +65,7 @@ fn main() {
.unwrap(); .unwrap();
channel.basic_prefetch(1).unwrap(); channel.basic_prefetch(1).unwrap();
channel let mut channel = channel
.consume( .consume(
worker::new(mrw), worker::new(mrw),
easyamqp::ConsumeConfig { easyamqp::ConsumeConfig {

View file

@ -59,7 +59,7 @@ fn main() {
.unwrap(); .unwrap();
channel.basic_prefetch(1).unwrap(); channel.basic_prefetch(1).unwrap();
channel let mut channel = channel
.consume( .consume(
worker::new(collector), worker::new(collector),
easyamqp::ConsumeConfig { easyamqp::ConsumeConfig {

View file

@ -303,9 +303,10 @@ pub trait ChannelExt {
fn bind_queue(&mut self, config: BindQueueConfig) -> Result<(), Self::Error>; fn bind_queue(&mut self, config: BindQueueConfig) -> Result<(), Self::Error>;
} }
pub trait ConsumerExt<T> { pub trait ConsumerExt<C> {
type Error; type Error;
fn consume(&mut self, callback: T, config: ConsumeConfig) -> Result<(), Self::Error>; type Handle;
fn consume(self, callback: C, config: ConsumeConfig) -> Result<Self::Handle, Self::Error>;
} }
impl ChannelExt for amqp::Channel { impl ChannelExt for amqp::Channel {
@ -350,10 +351,11 @@ impl ChannelExt for amqp::Channel {
} }
} }
impl<T: amqp::Consumer + 'static> ConsumerExt<T> for amqp::Channel { impl<C: amqp::Consumer + 'static> ConsumerExt<C> for amqp::Channel {
type Error = amqp::AMQPError; type Error = amqp::AMQPError;
type Handle = Self;
fn consume(&mut self, callback: T, config: ConsumeConfig) -> Result<(), Self::Error> { fn consume(mut self, callback: C, config: ConsumeConfig) -> Result<Self::Handle, Self::Error> {
self.basic_consume( self.basic_consume(
callback, callback,
config.queue, config.queue,
@ -364,6 +366,6 @@ impl<T: amqp::Consumer + 'static> ConsumerExt<T> for amqp::Channel {
config.no_wait, config.no_wait,
amqp::Table::new(), amqp::Table::new(),
)?; )?;
Ok(()) Ok(self)
} }
} }

View file

@ -11,7 +11,6 @@ use crate::systems;
use crate::tasks::eval; use crate::tasks::eval;
use crate::worker; use crate::worker;
use amqp::protocol::basic::{BasicProperties, Deliver};
use hubcaps::checks::CheckRunOptions; use hubcaps::checks::CheckRunOptions;
use hubcaps::gists::Gists; use hubcaps::gists::Gists;
use hubcaps::issues::Issue; use hubcaps::issues::Issue;
@ -60,12 +59,7 @@ impl<E: stats::SysEvents> EvaluationWorker<E> {
impl<E: stats::SysEvents + 'static> worker::SimpleWorker for EvaluationWorker<E> { impl<E: stats::SysEvents + 'static> worker::SimpleWorker for EvaluationWorker<E> {
type J = evaluationjob::EvaluationJob; type J = evaluationjob::EvaluationJob;
fn msg_to_job( fn msg_to_job(&mut self, _: &str, _: &Option<String>, body: &[u8]) -> Result<Self::J, String> {
&mut self,
_: &Deliver,
_: &BasicProperties,
body: &[u8],
) -> Result<Self::J, String> {
self.events.notify(Event::JobReceived); self.events.notify(Event::JobReceived);
match evaluationjob::from(body) { match evaluationjob::from(body) {
Ok(e) => { Ok(e) => {

View file

@ -3,8 +3,6 @@ use crate::ghevent;
use crate::message::{evaluationjob, Pr, Repo}; use crate::message::{evaluationjob, Pr, Repo};
use crate::worker; use crate::worker;
use amqp::protocol::basic::{BasicProperties, Deliver};
pub struct EvaluationFilterWorker { pub struct EvaluationFilterWorker {
acl: acl::ACL, acl: acl::ACL,
} }
@ -20,8 +18,8 @@ impl worker::SimpleWorker for EvaluationFilterWorker {
fn msg_to_job( fn msg_to_job(
&mut self, &mut self,
_: &Deliver, _: &str,
_: &BasicProperties, _: &Option<String>,
body: &[u8], body: &[u8],
) -> Result<Self::J, String> { ) -> Result<Self::J, String> {
match serde_json::from_slice(body) { match serde_json::from_slice(body) {

View file

@ -4,7 +4,6 @@ use crate::ghevent;
use crate::message::{buildjob, evaluationjob, Pr, Repo}; use crate::message::{buildjob, evaluationjob, Pr, Repo};
use crate::worker; use crate::worker;
use amqp::protocol::basic::{BasicProperties, Deliver};
use uuid::Uuid; use uuid::Uuid;
pub struct GitHubCommentWorker { pub struct GitHubCommentWorker {
@ -21,12 +20,7 @@ impl GitHubCommentWorker {
impl worker::SimpleWorker for GitHubCommentWorker { impl worker::SimpleWorker for GitHubCommentWorker {
type J = ghevent::IssueComment; type J = ghevent::IssueComment;
fn msg_to_job( fn msg_to_job(&mut self, _: &str, _: &Option<String>, body: &[u8]) -> Result<Self::J, String> {
&mut self,
_: &Deliver,
_: &BasicProperties,
body: &[u8],
) -> Result<Self::J, String> {
match serde_json::from_slice(body) { match serde_json::from_slice(body) {
Ok(e) => Ok(e), Ok(e) => Ok(e),
Err(e) => { Err(e) => {

View file

@ -4,7 +4,6 @@ use crate::message::buildresult::{BuildResult, BuildStatus, LegacyBuildResult};
use crate::message::Repo; use crate::message::Repo;
use crate::worker; use crate::worker;
use amqp::protocol::basic::{BasicProperties, Deliver};
use chrono::{DateTime, Utc}; use chrono::{DateTime, Utc};
use hubcaps::checks::{CheckRunOptions, CheckRunState, Conclusion, Output}; use hubcaps::checks::{CheckRunOptions, CheckRunState, Conclusion, Output};
@ -42,12 +41,7 @@ impl PostableEvent {
impl worker::SimpleWorker for GitHubCommentPoster { impl worker::SimpleWorker for GitHubCommentPoster {
type J = PostableEvent; type J = PostableEvent;
fn msg_to_job( fn msg_to_job(&mut self, _: &str, _: &Option<String>, body: &[u8]) -> Result<Self::J, String> {
&mut self,
_: &Deliver,
_: &BasicProperties,
body: &[u8],
) -> Result<Self::J, String> {
PostableEvent::from(body) PostableEvent::from(body)
} }

View file

@ -3,7 +3,6 @@ use crate::message::buildresult::BuildResult;
use crate::worker; use crate::worker;
use crate::writetoline::LineWriter; use crate::writetoline::LineWriter;
use amqp::protocol::basic::{BasicProperties, Deliver};
use lru_cache::LruCache; use lru_cache::LruCache;
use std::fs::{self, File, OpenOptions}; use std::fs::{self, File, OpenOptions};
@ -174,8 +173,8 @@ impl worker::SimpleWorker for LogMessageCollector {
fn msg_to_job( fn msg_to_job(
&mut self, &mut self,
deliver: &Deliver, routing_key: &str,
_: &BasicProperties, _: &Option<String>,
body: &[u8], body: &[u8],
) -> Result<Self::J, String> { ) -> Result<Self::J, String> {
let message: MsgType; let message: MsgType;
@ -203,7 +202,7 @@ impl worker::SimpleWorker for LogMessageCollector {
Ok(LogMessage { Ok(LogMessage {
from: LogFrom { from: LogFrom {
routing_key: deliver.routing_key.clone(), routing_key: routing_key.to_string(),
attempt_id, attempt_id,
}, },
message, message,

View file

@ -1,8 +1,6 @@
use crate::stats; use crate::stats;
use crate::worker; use crate::worker;
use amqp::protocol::basic::{BasicProperties, Deliver};
pub struct StatCollectorWorker<E> { pub struct StatCollectorWorker<E> {
events: E, events: E,
collector: stats::MetricCollector, collector: stats::MetricCollector,
@ -17,12 +15,7 @@ impl<E: stats::SysEvents + 'static> StatCollectorWorker<E> {
impl<E: stats::SysEvents + 'static> worker::SimpleWorker for StatCollectorWorker<E> { impl<E: stats::SysEvents + 'static> worker::SimpleWorker for StatCollectorWorker<E> {
type J = stats::EventMessage; type J = stats::EventMessage;
fn msg_to_job( fn msg_to_job(&mut self, _: &str, _: &Option<String>, body: &[u8]) -> Result<Self::J, String> {
&mut self,
_: &Deliver,
_: &BasicProperties,
body: &[u8],
) -> Result<Self::J, String> {
match serde_json::from_slice(body) { match serde_json::from_slice(body) {
Ok(e) => Ok(e), Ok(e) => Ok(e),
Err(_) => { Err(_) => {

View file

@ -1,5 +1,5 @@
use amqp::protocol::basic::{BasicProperties, Deliver}; use amqp::protocol::basic::{BasicProperties, Deliver};
use amqp::{Basic, Channel, Consumer}; use amqp::Basic;
use serde::Serialize; use serde::Serialize;
use std::marker::Send; use std::marker::Send;
@ -53,15 +53,15 @@ where
})) }))
} }
pub trait SimpleWorker: Send + 'static { pub trait SimpleWorker: Send {
type J: Send; type J: Send;
fn consumer(&mut self, job: &Self::J) -> Actions; fn consumer(&mut self, job: &Self::J) -> Actions;
fn msg_to_job( fn msg_to_job(
&mut self, &mut self,
method: &Deliver, method: &str,
headers: &BasicProperties, headers: &Option<String>,
body: &[u8], body: &[u8],
) -> Result<Self::J, String>; ) -> Result<Self::J, String>;
} }
@ -70,15 +70,17 @@ pub fn new<T: SimpleWorker>(worker: T) -> Worker<T> {
Worker { internal: worker } Worker { internal: worker }
} }
impl<T: SimpleWorker + Send> Consumer for Worker<T> { impl<T: SimpleWorker + Send> amqp::Consumer for Worker<T> {
fn handle_delivery( fn handle_delivery(
&mut self, &mut self,
channel: &mut Channel, channel: &mut amqp::Channel,
method: Deliver, method: Deliver,
headers: BasicProperties, headers: BasicProperties,
body: Vec<u8>, body: Vec<u8>,
) { ) {
let job = self.internal.msg_to_job(&method, &headers, &body); let job = self
.internal
.msg_to_job(&method.routing_key, &headers.content_type, &body);
if let Err(e) = job { if let Err(e) = job {
error!("Error decoding job: {:?}", e); error!("Error decoding job: {:?}", e);