From f9b7aace99d5e793e08163f43f0154c8ae99d8e7 Mon Sep 17 00:00:00 2001 From: Graham Christensen Date: Tue, 30 Jan 2018 09:42:05 -0500 Subject: [PATCH 01/10] amqp: update to support custom client properties --- nix/ofborg-carnix.nix | 4 ++-- ofborg/Cargo.lock | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/nix/ofborg-carnix.nix b/nix/ofborg-carnix.nix index 21748b9..a39cb3d 100644 --- a/nix/ofborg-carnix.nix +++ b/nix/ofborg-carnix.nix @@ -49,8 +49,8 @@ let kernel = buildPlatform.parsed.kernel.name; authors = [ "Andrii Dmytrenko " ]; src = fetchgit { url = "https://github.com/grahamc/rust-amqp.git"; - rev = "b3eae6b0b8458d32134581c53a918e6e52c7c329"; - sha256 = "01bma6ig0c7gvrj9b4jj3h977vg1mij9a9hzvvp6an06d74zfk4w"; + rev = "f9aec2f40aef69a459f26003ce47048f8e2a08d1"; + sha256 = "09k6fl7l0rcwilnckdfv3smiv1ilrwi1jxmrrkjwbrj64lky3jdy"; }; inherit dependencies buildDependencies features; }; diff --git a/ofborg/Cargo.lock b/ofborg/Cargo.lock index 5f720ce..9a10e1d 100644 --- a/ofborg/Cargo.lock +++ b/ofborg/Cargo.lock @@ -30,7 +30,7 @@ dependencies = [ [[package]] name = "amqp" version = "0.1.0" -source = "git+https://github.com/grahamc/rust-amqp.git#b3eae6b0b8458d32134581c53a918e6e52c7c329" +source = "git+https://github.com/grahamc/rust-amqp.git#f9aec2f40aef69a459f26003ce47048f8e2a08d1" dependencies = [ "amq-proto 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)", "env_logger 0.3.5 (registry+https://github.com/rust-lang/crates.io-index)", From 60a8081daa91a2327dd3752835af66a2124f15be Mon Sep 17 00:00:00 2001 From: Graham Christensen Date: Tue, 30 Jan 2018 16:45:17 -0500 Subject: [PATCH 02/10] fix typo in the web ingestion --- php/web/index.php | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/php/web/index.php b/php/web/index.php index a26b5e7..ecf8079 100644 --- a/php/web/index.php +++ b/php/web/index.php @@ -129,7 +129,7 @@ try { } if (!isset($input->repository)) { - throw new\ExecutionFailureException('Dataset does not have a repository'); + throw new ExecutionFailureException('Dataset does not have a repository'); } if (!isset($input->repository->full_name)) { From cb18c0d5b14c09c14aa27e82955bf9c70760eeee Mon Sep 17 00:00:00 2001 From: Graham Christensen Date: Tue, 30 Jan 2018 17:25:56 -0500 Subject: [PATCH 03/10] add a typed wrapper to amqp to avoid false, false, false, false --- ofborg/src/bin/builder.rs | 49 ++++++++++++----------------- ofborg/src/config.rs | 3 +- ofborg/src/easyamqp.rs | 66 +++++++++++++++++++++++++++++++++++++++ ofborg/src/lib.rs | 4 ++- 4 files changed, 90 insertions(+), 32 deletions(-) create mode 100644 ofborg/src/easyamqp.rs diff --git a/ofborg/src/bin/builder.rs b/ofborg/src/bin/builder.rs index a291ce2..2b342d8 100644 --- a/ofborg/src/bin/builder.rs +++ b/ofborg/src/bin/builder.rs @@ -9,13 +9,12 @@ use std::env; use std::path::Path; use amqp::Basic; -use amqp::Session; -use amqp::Table; - use ofborg::config; use ofborg::checkout; use ofborg::notifyworker; use ofborg::tasks; +use ofborg::easyamqp; +use ofborg::easyamqp::TypedWrappers; fn main() { @@ -23,36 +22,29 @@ fn main() { ofborg::setup_log(); - println!("Hello, world!"); - - - let mut session = Session::open_url(&cfg.rabbitmq.as_uri()).unwrap(); - println!("Connected to rabbitmq"); - - let mut channel = session.open_channel(1).unwrap(); - let cloner = checkout::cached_cloner(Path::new(&cfg.checkout.root)); let nix = cfg.nix(); - - - let full_logs: bool; - match &cfg.feedback { + let full_logs: bool = match &cfg.feedback { &Some(ref feedback) => { - full_logs = feedback.full_logs; + feedback.full_logs } &None => { warn!("Please define feedback.full_logs in your configuration to true or false!"); warn!("feedback.full_logs when true will cause the full build log to be sent back"); warn!("to the server, and be viewable by everyone."); warn!("I strongly encourage everybody turn this on!"); - full_logs = false; + false } - } + }; + + let mut session = easyamqp::session_from_config(&cfg.rabbitmq).unwrap(); + let mut channel = session.open_channel(1).unwrap(); channel.basic_prefetch(1).unwrap(); + channel - .basic_consume( + .consume( notifyworker::new(tasks::build::BuildWorker::new( cloner, nix, @@ -60,20 +52,19 @@ fn main() { cfg.runner.identity.clone(), full_logs, )), - format!("build-inputs-{}", cfg.nix.system.clone()).as_ref(), - format!("{}-builder", cfg.whoami()).as_ref(), - false, - false, - false, - false, - Table::new(), + easyamqp::ConsumeConfig { + queue: format!("build-inputs-{}", cfg.nix.system.clone()), + consumer_tag: format!("{}-builder", cfg.whoami()), + no_local: false, + no_ack: false, + exclusive: false, + nowait: false, + arguments: None + }, ) .unwrap(); channel.start_consuming(); - - println!("Finished consuming?"); - channel.close(200, "Bye").unwrap(); println!("Closed the channel"); session.close(200, "Good Bye"); diff --git a/ofborg/src/config.rs b/ofborg/src/config.rs index 5671ee5..a77a159 100644 --- a/ofborg/src/config.rs +++ b/ofborg/src/config.rs @@ -27,7 +27,7 @@ pub struct FeedbackConfig { pub full_logs: bool, } -#[derive(Serialize, Deserialize, Debug)] +#[derive(Serialize, Deserialize, Debug, Clone)] pub struct RabbitMQConfig { pub ssl: bool, pub host: String, @@ -107,7 +107,6 @@ impl Config { } } - impl RabbitMQConfig { pub fn as_uri(&self) -> String { return format!( diff --git a/ofborg/src/easyamqp.rs b/ofborg/src/easyamqp.rs new file mode 100644 index 0000000..bf0354c --- /dev/null +++ b/ofborg/src/easyamqp.rs @@ -0,0 +1,66 @@ + +use ofborg; +use ofborg::config::RabbitMQConfig; +use amqp; +use amqp::Basic; + +pub struct ConsumeConfig { + pub queue: String, + pub consumer_tag: String, + pub no_local: bool, + pub no_ack: bool, + pub exclusive: bool, + pub nowait: bool, + pub arguments: Option, +} + +pub fn session_from_config(config: &RabbitMQConfig) + -> Result { + let scheme = if config.ssl { + amqp::AMQPScheme::AMQPS + } else { + amqp::AMQPScheme::AMQP + }; + + let mut properties = amqp::Table::new(); + // properties.insert("identity".to_owned(), amqp::TableEntry::LongString(identity.to_owned())); + properties.insert( + "ofborg_version".to_owned(), + amqp::TableEntry::LongString(ofborg::VERSION.to_owned()) + ); + + amqp::Session::new( + amqp::Options{ + host: config.host.clone(), + login: config.username.clone(), + password: config.password.clone(), + scheme: scheme, + properties: properties, + .. amqp::Options::default() + } + ) +} + +pub trait TypedWrappers { + fn consume(&mut self, callback: T, config: ConsumeConfig) + -> Result + where T: amqp::Consumer + 'static; +} + +impl TypedWrappers for amqp::Channel { + fn consume(&mut self, callback: T, config: ConsumeConfig) + -> Result + where T: amqp::Consumer + 'static + { + self.basic_consume( + callback, + config.queue, + config.consumer_tag, + config.no_local, + config.no_ack, + config.exclusive, + config.nowait, + config.arguments.unwrap_or(amqp::Table::new()), + ) + } +} diff --git a/ofborg/src/lib.rs b/ofborg/src/lib.rs index 306c44b..bda70db 100644 --- a/ofborg/src/lib.rs +++ b/ofborg/src/lib.rs @@ -40,6 +40,7 @@ pub mod asynccmd; pub mod notifyworker; pub mod writetoline; pub mod test_scratch; +pub mod easyamqp; pub mod ofborg { pub use asynccmd; @@ -62,8 +63,9 @@ pub mod ofborg { pub use tagger; pub use writetoline; pub use test_scratch; + pub use easyamqp; - + pub const VERSION: &'static str = env!("CARGO_PKG_VERSION"); } pub fn setup_log() { From c0cab43513d2957172c1da1b18052b2f9328e530 Mon Sep 17 00:00:00 2001 From: Graham Christensen Date: Tue, 30 Jan 2018 20:30:46 -0500 Subject: [PATCH 04/10] fixup --- ofborg/src/bin/builder.rs | 2 +- ofborg/src/easyamqp.rs | 38 ++++++++++++++++++++++++++++++++++++-- 2 files changed, 37 insertions(+), 3 deletions(-) diff --git a/ofborg/src/bin/builder.rs b/ofborg/src/bin/builder.rs index 2b342d8..2a0b418 100644 --- a/ofborg/src/bin/builder.rs +++ b/ofborg/src/bin/builder.rs @@ -57,8 +57,8 @@ fn main() { consumer_tag: format!("{}-builder", cfg.whoami()), no_local: false, no_ack: false, + no_wait: false, exclusive: false, - nowait: false, arguments: None }, ) diff --git a/ofborg/src/easyamqp.rs b/ofborg/src/easyamqp.rs index bf0354c..ac43b75 100644 --- a/ofborg/src/easyamqp.rs +++ b/ofborg/src/easyamqp.rs @@ -5,12 +5,46 @@ use amqp; use amqp::Basic; pub struct ConsumeConfig { + /// Specifies the name of the queue to consume from. pub queue: String, + + /// Specifies the identifier for the consumer. The consumer tag is + /// local to a channel, so two clients can use the same consumer + /// tags. If this field is empty the server will generate a unique + /// tag. + /// + /// The client MUST NOT specify a tag that refers to an existing + /// consumer. Error code: not-allowed pub consumer_tag: String, + + /// If the no-local field is set the server will not send messages + /// to the connection that published them. pub no_local: bool, + + /// If this field is set the server does not expect + /// acknowledgements for messages. That is, when a message is + /// delivered to the client the server assumes the delivery will + /// succeed and immediately dequeues it. This functionality may + /// increase performance but at the cost of reliability. Messages + /// can get lost if a client dies before they are delivered to the + /// application. pub no_ack: bool, + + /// Request exclusive consumer access, meaning only this consumer + /// can access the queue. + /// + /// The client MAY NOT gain exclusive access to a queue that + /// already has active consumers. Error code: access-refused pub exclusive: bool, - pub nowait: bool, + + /// If set, the server will not respond to the method. The client + /// should not wait for a reply method. If the server could not + /// complete the method it will raise a channel or connection + /// exception. + pub no_wait: bool, + + /// A set of arguments for the consume. The syntax and semantics + /// of these arguments depends on the server implementation. pub arguments: Option, } @@ -59,7 +93,7 @@ impl TypedWrappers for amqp::Channel { config.no_local, config.no_ack, config.exclusive, - config.nowait, + config.no_wait, config.arguments.unwrap_or(amqp::Table::new()), ) } From 634f2bb51fba80c67059b48fe4fc76357e1d8b6a Mon Sep 17 00:00:00 2001 From: Graham Christensen Date: Tue, 30 Jan 2018 22:23:21 -0500 Subject: [PATCH 05/10] Add a typed wrapper to exchange declarations --- ofborg/src/easyamqp.rs | 127 +++++++++++++++++++++++++++++++++++++++++ 1 file changed, 127 insertions(+) diff --git a/ofborg/src/easyamqp.rs b/ofborg/src/easyamqp.rs index ac43b75..9ccaf78 100644 --- a/ofborg/src/easyamqp.rs +++ b/ofborg/src/easyamqp.rs @@ -48,6 +48,111 @@ pub struct ConsumeConfig { pub arguments: Option, } +pub enum ExchangeType { + Topic, + Headers, + Fanout, + Direct, + Custom(String), +} + +impl Into for ExchangeType { + fn into(self) -> String { + match self { + ExchangeType::Topic => "topic".to_owned(), + ExchangeType::Headers => "headers".to_owned(), + ExchangeType::Fanout => "fanout".to_owned(), + ExchangeType::Direct => "direct".to_owned(), + ExchangeType::Custom(x) => x, + } + } +} + +pub struct ExchangeConfig { + /// Exchange names starting with "amq." are reserved for + /// pre-declared and standardised exchanges. The client MAY + /// declare an exchange starting with "amq." if the passive option + /// is set, or the exchange already exists. Error code: + /// access-refused + /// + /// The exchange name consists of a non-empty sequence of these + /// characters: letters, digits, hyphen, underscore, period, or + /// colon. Error code: precondition-failed + exchange: String, + + /// Each exchange belongs to one of a set of exchange types + /// implemented by the server. The exchange types define the + /// functionality of the exchange - i.e. how messages are routed + /// through it. It is not valid or meaningful to attempt to change + /// the type of an existing exchange. + /// + /// Exchanges cannot be redeclared with different types. The + /// client MUST not attempt to redeclare an existing exchange with + /// a different type than used in the original Exchange.Declare + /// method. Error code: not-allowed + /// + /// The client MUST NOT attempt to declare an exchange with a type + /// that the server does not support. Error code: command-invalid + exchange_type: ExchangeType, + + /// If set, the server will reply with Declare-Ok if the exchange + /// already exists with the same name, and raise an error if not. + /// The client can use this to check whether an exchange exists + /// without modifying the server state. When set, all other method + /// fields except name and no-wait are ignored. A declare with + /// both passive and no-wait has no effect. Arguments are compared + /// for semantic equivalence. + /// + /// If set, and the exchange does not already exist, the server + /// MUST raise a channel exception with reply code 404 (not + /// found). + /// + /// If not set and the exchange exists, the server MUST check that + /// the existing exchange has the same values for type, durable, + /// and arguments fields. The server MUST respond with Declare-Ok + /// if the requested exchange matches these fields, and MUST raise + /// a channel exception if not. + passive: bool, + + /// If set when creating a new exchange, the exchange will be + /// marked as durable. Durable exchanges remain active when a + /// server restarts. Non-durable exchanges (transient exchanges) + /// are purged if/when a server restarts. + /// + /// The server MUST support both durable and transient exchanges. + durable: bool, + + /// If set, the exchange is deleted when all queues have finished + /// using it. + /// + /// The server SHOULD allow for a reasonable delay between the + /// point when it determines that an exchange is not being used + /// (or no longer used), and the point when it deletes the + /// exchange. At the least it must allow a client to create an + /// exchange and then bind a queue to it, with a small but + /// non-zero delay between these two actions. + /// + /// The server MUST ignore the auto-delete field if the exchange + /// already exists. + auto_delete: bool, + + /// If set, the exchange may not be used directly by publishers, + /// but only when bound to other exchanges. Internal exchanges are + /// used to construct wiring that is not visible to applications. + internal: bool, + + /// If set, the server will not respond to the method. The client + /// should not wait for a reply method. If the server could not + /// complete the method it will raise a channel or connection + /// exception. + nowait: bool, + + /// A set of arguments for the declaration. The syntax and + /// semantics of these arguments depends on the server + /// implementation. + arguments: Option, +} + pub fn session_from_config(config: &RabbitMQConfig) -> Result { let scheme = if config.ssl { @@ -79,6 +184,11 @@ pub trait TypedWrappers { fn consume(&mut self, callback: T, config: ConsumeConfig) -> Result where T: amqp::Consumer + 'static; + + fn declare_exchange(&mut self, config: ExchangeConfig) + -> Result + where T: amqp::Consumer + 'static; + } impl TypedWrappers for amqp::Channel { @@ -97,4 +207,21 @@ impl TypedWrappers for amqp::Channel { config.arguments.unwrap_or(amqp::Table::new()), ) } + + fn declare_exchange(&mut self, config: ExchangeConfig) + -> Result + where T: amqp::Consumer + 'static + { + self.exchange_declare( + config.exchange, + config.exchange_type.into(), + config.passive, + config.durable, + config.auto_delete, + config.internal, + config.nowait, + config.arguments.unwrap_or(amqp::Table::new()) + ) + } + } From 8ff9fa96380c5af9e30b9d9818a61828232526a6 Mon Sep 17 00:00:00 2001 From: Graham Christensen Date: Wed, 31 Jan 2018 09:06:40 -0500 Subject: [PATCH 06/10] Partial conversion to a typed wrappper --- ofborg/src/bin/github-comment-filter.rs | 4 +++- ofborg/src/bin/log-message-collector.rs | 6 ++++-- ofborg/src/bin/log-message-generator.rs | 2 ++ ofborg/src/bin/mass-rebuilder.rs | 2 ++ 4 files changed, 11 insertions(+), 3 deletions(-) diff --git a/ofborg/src/bin/github-comment-filter.rs b/ofborg/src/bin/github-comment-filter.rs index 353b608..bf31ebe 100644 --- a/ofborg/src/bin/github-comment-filter.rs +++ b/ofborg/src/bin/github-comment-filter.rs @@ -16,6 +16,8 @@ use amqp::Table; use ofborg::config; use ofborg::worker; use ofborg::tasks; +use ofborg::easyamqp; +use ofborg::easyamqp::TypedWrappers; fn main() { @@ -25,7 +27,7 @@ fn main() { println!("Hello, world!"); - let mut session = Session::open_url(&cfg.rabbitmq.as_uri()).unwrap(); + let mut session = easyamqp::session_from_config(&cfg.rabbitmq).unwrap(); println!("Connected to rabbitmq"); let mut channel = session.open_channel(1).unwrap(); diff --git a/ofborg/src/bin/log-message-collector.rs b/ofborg/src/bin/log-message-collector.rs index b35e1b6..a831ea9 100644 --- a/ofborg/src/bin/log-message-collector.rs +++ b/ofborg/src/bin/log-message-collector.rs @@ -12,13 +12,15 @@ use ofborg::config; use ofborg::worker; use ofborg::tasks; use amqp::Basic; +use ofborg::easyamqp; +use ofborg::easyamqp::TypedWrappers; + fn main() { let cfg = config::load(env::args().nth(1).unwrap().as_ref()); ofborg::setup_log(); - - let mut session = Session::open_url(&cfg.rabbitmq.as_uri()).unwrap(); + let mut session = easyamqp::session_from_config(&cfg.rabbitmq).unwrap(); println!("Connected to rabbitmq"); let mut channel = session.open_channel(1).unwrap(); diff --git a/ofborg/src/bin/log-message-generator.rs b/ofborg/src/bin/log-message-generator.rs index 97b2337..9eaf6f9 100644 --- a/ofborg/src/bin/log-message-generator.rs +++ b/ofborg/src/bin/log-message-generator.rs @@ -13,6 +13,8 @@ use ofborg::config; use ofborg::notifyworker; use ofborg::tasks::build; use ofborg::message::buildjob; +use ofborg::easyamqp; +use ofborg::easyamqp::TypedWrappers; fn main() { let cfg = config::load(env::args().nth(1).unwrap().as_ref()); diff --git a/ofborg/src/bin/mass-rebuilder.rs b/ofborg/src/bin/mass-rebuilder.rs index d15615f..1b70699 100644 --- a/ofborg/src/bin/mass-rebuilder.rs +++ b/ofborg/src/bin/mass-rebuilder.rs @@ -13,6 +13,8 @@ use ofborg::worker; use amqp::Session; use amqp::Table; use amqp::Basic; +use ofborg::easyamqp; +use ofborg::easyamqp::TypedWrappers; fn main() { let cfg = config::load(env::args().nth(1).unwrap().as_ref()); From 5e357d5e01b96f8bd740f1d2c9c5ddd884e121f8 Mon Sep 17 00:00:00 2001 From: Graham Christensen Date: Wed, 31 Jan 2018 09:27:11 -0500 Subject: [PATCH 07/10] Migrate fully to session_from_config and the typed consume() wrapper --- ofborg/src/bin/build-faker.rs | 6 ++---- ofborg/src/bin/github-comment-filter.rs | 20 ++++++++++---------- ofborg/src/bin/log-message-collector.rs | 23 ++++++++++++----------- ofborg/src/bin/log-message-generator.rs | 4 +--- ofborg/src/bin/mass-rebuilder.rs | 22 +++++++++++----------- 5 files changed, 36 insertions(+), 39 deletions(-) diff --git a/ofborg/src/bin/build-faker.rs b/ofborg/src/bin/build-faker.rs index 2d56cb4..7752db4 100644 --- a/ofborg/src/bin/build-faker.rs +++ b/ofborg/src/bin/build-faker.rs @@ -9,15 +9,13 @@ extern crate hyper_native_tls; use std::env; -use amqp::Session; - use ofborg::config; use ofborg::worker; use ofborg::notifyworker; use ofborg::notifyworker::NotificationReceiver; use ofborg::commentparser; use ofborg::message::buildjob; - +use ofborg::easyamqp; use ofborg::message::{Pr, Repo}; fn main() { @@ -27,7 +25,7 @@ fn main() { println!("Hello, world!"); - let mut session = Session::open_url(&cfg.rabbitmq.as_uri()).unwrap(); + let mut session = easyamqp::session_from_config(&cfg.rabbitmq).unwrap(); println!("Connected to rabbitmq"); diff --git a/ofborg/src/bin/github-comment-filter.rs b/ofborg/src/bin/github-comment-filter.rs index bf31ebe..25763e3 100644 --- a/ofborg/src/bin/github-comment-filter.rs +++ b/ofborg/src/bin/github-comment-filter.rs @@ -10,8 +10,6 @@ extern crate hyper_native_tls; use std::env; use amqp::Basic; -use amqp::Session; -use amqp::Table; use ofborg::config; use ofborg::worker; @@ -34,18 +32,20 @@ fn main() { channel.basic_prefetch(1).unwrap(); channel - .basic_consume( + .consume( worker::new(tasks::githubcommentfilter::GitHubCommentWorker::new( cfg.acl(), cfg.github(), )), - "build-inputs", - format!("{}-github-comment-filter", cfg.whoami()).as_ref(), - false, - false, - false, - false, - Table::new(), + easyamqp::ConsumeConfig { + queue: "build-inputs".to_owned(), + consumer_tag: format!("{}-github-comment-filter", cfg.whoami()), + no_local: false, + no_ack: false, + no_wait: false, + exclusive: false, + arguments: None + }, ) .unwrap(); diff --git a/ofborg/src/bin/log-message-collector.rs b/ofborg/src/bin/log-message-collector.rs index a831ea9..76f8de5 100644 --- a/ofborg/src/bin/log-message-collector.rs +++ b/ofborg/src/bin/log-message-collector.rs @@ -5,13 +5,11 @@ extern crate env_logger; use std::env; use std::path::PathBuf; -use amqp::Session; use amqp::Table; use ofborg::config; use ofborg::worker; use ofborg::tasks; -use amqp::Basic; use ofborg::easyamqp; use ofborg::easyamqp::TypedWrappers; @@ -49,22 +47,25 @@ fn main() { .unwrap(); - channel - .basic_consume( + channel + .consume( worker::new(tasks::log_message_collector::LogMessageCollector::new( PathBuf::from(cfg.log_storage.clone().unwrap().path), 100, )), - queue_name, - format!("{}-log-collector", cfg.whoami()), - false, - false, - false, - false, - Table::new(), + easyamqp::ConsumeConfig { + queue: queue_name, + consumer_tag: format!("{}-log-collector", cfg.whoami()), + no_local: false, + no_ack: false, + no_wait: false, + exclusive: false, + arguments: None + }, ) .unwrap(); + channel.start_consuming(); println!("Finished consuming?"); diff --git a/ofborg/src/bin/log-message-generator.rs b/ofborg/src/bin/log-message-generator.rs index 9eaf6f9..6d93544 100644 --- a/ofborg/src/bin/log-message-generator.rs +++ b/ofborg/src/bin/log-message-generator.rs @@ -6,7 +6,6 @@ use std::env; use std::time::Duration; use std::thread; -use amqp::Session; use ofborg::message::{Pr, Repo}; use ofborg::config; @@ -14,13 +13,12 @@ use ofborg::notifyworker; use ofborg::tasks::build; use ofborg::message::buildjob; use ofborg::easyamqp; -use ofborg::easyamqp::TypedWrappers; fn main() { let cfg = config::load(env::args().nth(1).unwrap().as_ref()); ofborg::setup_log(); - let mut session = Session::open_url(&cfg.rabbitmq.as_uri()).unwrap(); + let mut session = easyamqp::session_from_config(&cfg.rabbitmq).unwrap(); println!("Connected to rabbitmq"); println!("About to open channel #1"); diff --git a/ofborg/src/bin/mass-rebuilder.rs b/ofborg/src/bin/mass-rebuilder.rs index 1b70699..cded68b 100644 --- a/ofborg/src/bin/mass-rebuilder.rs +++ b/ofborg/src/bin/mass-rebuilder.rs @@ -10,8 +10,6 @@ use ofborg::checkout; use ofborg::stats; use ofborg::worker; -use amqp::Session; -use amqp::Table; use amqp::Basic; use ofborg::easyamqp; use ofborg::easyamqp::TypedWrappers; @@ -24,7 +22,7 @@ fn main() { println!("Hello, world!"); - let mut session = Session::open_url(&cfg.rabbitmq.as_uri()).unwrap(); + let mut session = easyamqp::session_from_config(&cfg.rabbitmq).unwrap(); println!("Connected to rabbitmq"); let mut channel = session.open_channel(1).unwrap(); @@ -44,15 +42,17 @@ fn main() { channel.basic_prefetch(1).unwrap(); channel - .basic_consume( + .consume( worker::new(mrw), - "mass-rebuild-check-jobs", - format!("{}-mass-rebuild-checker", cfg.whoami()).as_ref(), - false, - false, - false, - false, - Table::new(), + easyamqp::ConsumeConfig { + queue: "mass-rebuild-check-jobs".to_owned(), + consumer_tag: format!("{}-mass-rebuild-checker", cfg.whoami()), + no_local: false, + no_ack: false, + no_wait: false, + exclusive: false, + arguments: None + }, ) .unwrap(); From 26e0612d5e4e3479b8625298bffac18d7fd8267a Mon Sep 17 00:00:00 2001 From: Graham Christensen Date: Wed, 31 Jan 2018 09:29:44 -0500 Subject: [PATCH 08/10] rustfmt --- ofborg/src/bin/builder.rs | 6 +-- ofborg/src/bin/github-comment-filter.rs | 2 +- ofborg/src/bin/log-message-collector.rs | 4 +- ofborg/src/bin/mass-rebuilder.rs | 2 +- ofborg/src/easyamqp.rs | 57 +++++++++++++------------ 5 files changed, 35 insertions(+), 36 deletions(-) diff --git a/ofborg/src/bin/builder.rs b/ofborg/src/bin/builder.rs index 2a0b418..538252f 100644 --- a/ofborg/src/bin/builder.rs +++ b/ofborg/src/bin/builder.rs @@ -26,9 +26,7 @@ fn main() { let nix = cfg.nix(); let full_logs: bool = match &cfg.feedback { - &Some(ref feedback) => { - feedback.full_logs - } + &Some(ref feedback) => feedback.full_logs, &None => { warn!("Please define feedback.full_logs in your configuration to true or false!"); warn!("feedback.full_logs when true will cause the full build log to be sent back"); @@ -59,7 +57,7 @@ fn main() { no_ack: false, no_wait: false, exclusive: false, - arguments: None + arguments: None, }, ) .unwrap(); diff --git a/ofborg/src/bin/github-comment-filter.rs b/ofborg/src/bin/github-comment-filter.rs index 25763e3..f75bc83 100644 --- a/ofborg/src/bin/github-comment-filter.rs +++ b/ofborg/src/bin/github-comment-filter.rs @@ -44,7 +44,7 @@ fn main() { no_ack: false, no_wait: false, exclusive: false, - arguments: None + arguments: None, }, ) .unwrap(); diff --git a/ofborg/src/bin/log-message-collector.rs b/ofborg/src/bin/log-message-collector.rs index 76f8de5..f6eb773 100644 --- a/ofborg/src/bin/log-message-collector.rs +++ b/ofborg/src/bin/log-message-collector.rs @@ -47,7 +47,7 @@ fn main() { .unwrap(); - channel + channel .consume( worker::new(tasks::log_message_collector::LogMessageCollector::new( PathBuf::from(cfg.log_storage.clone().unwrap().path), @@ -60,7 +60,7 @@ fn main() { no_ack: false, no_wait: false, exclusive: false, - arguments: None + arguments: None, }, ) .unwrap(); diff --git a/ofborg/src/bin/mass-rebuilder.rs b/ofborg/src/bin/mass-rebuilder.rs index cded68b..0810fad 100644 --- a/ofborg/src/bin/mass-rebuilder.rs +++ b/ofborg/src/bin/mass-rebuilder.rs @@ -51,7 +51,7 @@ fn main() { no_ack: false, no_wait: false, exclusive: false, - arguments: None + arguments: None, }, ) .unwrap(); diff --git a/ofborg/src/easyamqp.rs b/ofborg/src/easyamqp.rs index 9ccaf78..03a24ef 100644 --- a/ofborg/src/easyamqp.rs +++ b/ofborg/src/easyamqp.rs @@ -153,8 +153,7 @@ pub struct ExchangeConfig { arguments: Option, } -pub fn session_from_config(config: &RabbitMQConfig) - -> Result { +pub fn session_from_config(config: &RabbitMQConfig) -> Result { let scheme = if config.ssl { amqp::AMQPScheme::AMQPS } else { @@ -165,36 +164,36 @@ pub fn session_from_config(config: &RabbitMQConfig) // properties.insert("identity".to_owned(), amqp::TableEntry::LongString(identity.to_owned())); properties.insert( "ofborg_version".to_owned(), - amqp::TableEntry::LongString(ofborg::VERSION.to_owned()) + amqp::TableEntry::LongString(ofborg::VERSION.to_owned()), ); - amqp::Session::new( - amqp::Options{ - host: config.host.clone(), - login: config.username.clone(), - password: config.password.clone(), - scheme: scheme, - properties: properties, - .. amqp::Options::default() - } - ) + amqp::Session::new(amqp::Options { + host: config.host.clone(), + login: config.username.clone(), + password: config.password.clone(), + scheme: scheme, + properties: properties, + ..amqp::Options::default() + }) } pub trait TypedWrappers { - fn consume(&mut self, callback: T, config: ConsumeConfig) - -> Result - where T: amqp::Consumer + 'static; - - fn declare_exchange(&mut self, config: ExchangeConfig) - -> Result - where T: amqp::Consumer + 'static; + fn consume(&mut self, callback: T, config: ConsumeConfig) -> Result + where + T: amqp::Consumer + 'static; + fn declare_exchange( + &mut self, + config: ExchangeConfig, + ) -> Result + where + T: amqp::Consumer + 'static; } impl TypedWrappers for amqp::Channel { - fn consume(&mut self, callback: T, config: ConsumeConfig) - -> Result - where T: amqp::Consumer + 'static + fn consume(&mut self, callback: T, config: ConsumeConfig) -> Result + where + T: amqp::Consumer + 'static, { self.basic_consume( callback, @@ -208,9 +207,12 @@ impl TypedWrappers for amqp::Channel { ) } - fn declare_exchange(&mut self, config: ExchangeConfig) - -> Result - where T: amqp::Consumer + 'static + fn declare_exchange( + &mut self, + config: ExchangeConfig, + ) -> Result + where + T: amqp::Consumer + 'static, { self.exchange_declare( config.exchange, @@ -220,8 +222,7 @@ impl TypedWrappers for amqp::Channel { config.auto_delete, config.internal, config.nowait, - config.arguments.unwrap_or(amqp::Table::new()) + config.arguments.unwrap_or(amqp::Table::new()), ) } - } From b47704d6e486b8bae0301477f013fdfd1c4b1f2f Mon Sep 17 00:00:00 2001 From: Graham Christensen Date: Wed, 31 Jan 2018 09:29:57 -0500 Subject: [PATCH 09/10] unrelated changes with rustfmt --- ofborg/src/tasks/githubcommentfilter.rs | 15 +++++---------- 1 file changed, 5 insertions(+), 10 deletions(-) diff --git a/ofborg/src/tasks/githubcommentfilter.rs b/ofborg/src/tasks/githubcommentfilter.rs index 81503e1..55c54c8 100644 --- a/ofborg/src/tasks/githubcommentfilter.rs +++ b/ofborg/src/tasks/githubcommentfilter.rs @@ -53,15 +53,14 @@ impl worker::SimpleWorker for GitHubCommentWorker { return vec![worker::Action::Ack]; } - let build_destinations: Vec<(Option,Option)>; + let build_destinations: Vec<(Option, Option)>; if self.acl.can_build_unrestricted( &job.comment.user.login, &job.repository.full_name, - ) { - build_destinations = vec![ - (Some("build-jobs".to_owned()), None) - ]; + ) + { + build_destinations = vec![(Some("build-jobs".to_owned()), None)]; } else if self.acl.can_build_restricted( &job.comment.user.login, &job.repository.full_name, @@ -140,11 +139,7 @@ impl worker::SimpleWorker for GitHubCommentWorker { }; for (exch, rk) in build_destinations.clone() { - response.push(worker::publish_serde_action( - exch, - rk, - &msg, - )); + response.push(worker::publish_serde_action(exch, rk, &msg)); } } commentparser::Instruction::Eval => { From 836028cf1687311561b4378012447a5435283539 Mon Sep 17 00:00:00 2001 From: Graham Christensen Date: Wed, 31 Jan 2018 09:46:23 -0500 Subject: [PATCH 10/10] Fixup connection issues --- ofborg/src/easyamqp.rs | 15 ++++++++++++--- 1 file changed, 12 insertions(+), 3 deletions(-) diff --git a/ofborg/src/easyamqp.rs b/ofborg/src/easyamqp.rs index 03a24ef..5d55cbf 100644 --- a/ofborg/src/easyamqp.rs +++ b/ofborg/src/easyamqp.rs @@ -161,20 +161,29 @@ pub fn session_from_config(config: &RabbitMQConfig) -> Result 5671, + amqp::AMQPScheme::AMQP => 5672, + }, + vhost: "/".to_owned(), login: config.username.clone(), password: config.password.clone(), scheme: scheme, properties: properties, ..amqp::Options::default() - }) + }; + + let session = try!(amqp::Session::new(options)); + + info!("Connected to {}", &config.host); + return Ok(session); } pub trait TypedWrappers {