Merge pull request #50 from NixOS/typed-wrapper

Typed wrapper
This commit is contained in:
Graham Christensen 2018-01-31 10:02:22 -05:00 committed by GitHub
commit 1b19b4ba2f
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
13 changed files with 315 additions and 88 deletions

View file

@ -49,8 +49,8 @@ let kernel = buildPlatform.parsed.kernel.name;
authors = [ "Andrii Dmytrenko <andrey@reevoo.com>" ];
src = fetchgit {
url = "https://github.com/grahamc/rust-amqp.git";
rev = "b3eae6b0b8458d32134581c53a918e6e52c7c329";
sha256 = "01bma6ig0c7gvrj9b4jj3h977vg1mij9a9hzvvp6an06d74zfk4w";
rev = "f9aec2f40aef69a459f26003ce47048f8e2a08d1";
sha256 = "09k6fl7l0rcwilnckdfv3smiv1ilrwi1jxmrrkjwbrj64lky3jdy";
};
inherit dependencies buildDependencies features;
};

2
ofborg/Cargo.lock generated
View file

@ -30,7 +30,7 @@ dependencies = [
[[package]]
name = "amqp"
version = "0.1.0"
source = "git+https://github.com/grahamc/rust-amqp.git#b3eae6b0b8458d32134581c53a918e6e52c7c329"
source = "git+https://github.com/grahamc/rust-amqp.git#f9aec2f40aef69a459f26003ce47048f8e2a08d1"
dependencies = [
"amq-proto 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)",
"env_logger 0.3.5 (registry+https://github.com/rust-lang/crates.io-index)",

View file

@ -9,15 +9,13 @@ extern crate hyper_native_tls;
use std::env;
use amqp::Session;
use ofborg::config;
use ofborg::worker;
use ofborg::notifyworker;
use ofborg::notifyworker::NotificationReceiver;
use ofborg::commentparser;
use ofborg::message::buildjob;
use ofborg::easyamqp;
use ofborg::message::{Pr, Repo};
fn main() {
@ -27,7 +25,7 @@ fn main() {
println!("Hello, world!");
let mut session = Session::open_url(&cfg.rabbitmq.as_uri()).unwrap();
let mut session = easyamqp::session_from_config(&cfg.rabbitmq).unwrap();
println!("Connected to rabbitmq");

View file

@ -9,13 +9,12 @@ use std::env;
use std::path::Path;
use amqp::Basic;
use amqp::Session;
use amqp::Table;
use ofborg::config;
use ofborg::checkout;
use ofborg::notifyworker;
use ofborg::tasks;
use ofborg::easyamqp;
use ofborg::easyamqp::TypedWrappers;
fn main() {
@ -23,36 +22,27 @@ fn main() {
ofborg::setup_log();
println!("Hello, world!");
let mut session = Session::open_url(&cfg.rabbitmq.as_uri()).unwrap();
println!("Connected to rabbitmq");
let mut channel = session.open_channel(1).unwrap();
let cloner = checkout::cached_cloner(Path::new(&cfg.checkout.root));
let nix = cfg.nix();
let full_logs: bool;
match &cfg.feedback {
&Some(ref feedback) => {
full_logs = feedback.full_logs;
}
let full_logs: bool = match &cfg.feedback {
&Some(ref feedback) => feedback.full_logs,
&None => {
warn!("Please define feedback.full_logs in your configuration to true or false!");
warn!("feedback.full_logs when true will cause the full build log to be sent back");
warn!("to the server, and be viewable by everyone.");
warn!("I strongly encourage everybody turn this on!");
full_logs = false;
}
false
}
};
let mut session = easyamqp::session_from_config(&cfg.rabbitmq).unwrap();
let mut channel = session.open_channel(1).unwrap();
channel.basic_prefetch(1).unwrap();
channel
.basic_consume(
.consume(
notifyworker::new(tasks::build::BuildWorker::new(
cloner,
nix,
@ -60,20 +50,19 @@ fn main() {
cfg.runner.identity.clone(),
full_logs,
)),
format!("build-inputs-{}", cfg.nix.system.clone()).as_ref(),
format!("{}-builder", cfg.whoami()).as_ref(),
false,
false,
false,
false,
Table::new(),
easyamqp::ConsumeConfig {
queue: format!("build-inputs-{}", cfg.nix.system.clone()),
consumer_tag: format!("{}-builder", cfg.whoami()),
no_local: false,
no_ack: false,
no_wait: false,
exclusive: false,
arguments: None,
},
)
.unwrap();
channel.start_consuming();
println!("Finished consuming?");
channel.close(200, "Bye").unwrap();
println!("Closed the channel");
session.close(200, "Good Bye");

View file

@ -10,12 +10,12 @@ extern crate hyper_native_tls;
use std::env;
use amqp::Basic;
use amqp::Session;
use amqp::Table;
use ofborg::config;
use ofborg::worker;
use ofborg::tasks;
use ofborg::easyamqp;
use ofborg::easyamqp::TypedWrappers;
fn main() {
@ -25,25 +25,27 @@ fn main() {
println!("Hello, world!");
let mut session = Session::open_url(&cfg.rabbitmq.as_uri()).unwrap();
let mut session = easyamqp::session_from_config(&cfg.rabbitmq).unwrap();
println!("Connected to rabbitmq");
let mut channel = session.open_channel(1).unwrap();
channel.basic_prefetch(1).unwrap();
channel
.basic_consume(
.consume(
worker::new(tasks::githubcommentfilter::GitHubCommentWorker::new(
cfg.acl(),
cfg.github(),
)),
"build-inputs",
format!("{}-github-comment-filter", cfg.whoami()).as_ref(),
false,
false,
false,
false,
Table::new(),
easyamqp::ConsumeConfig {
queue: "build-inputs".to_owned(),
consumer_tag: format!("{}-github-comment-filter", cfg.whoami()),
no_local: false,
no_ack: false,
no_wait: false,
exclusive: false,
arguments: None,
},
)
.unwrap();

View file

@ -5,20 +5,20 @@ extern crate env_logger;
use std::env;
use std::path::PathBuf;
use amqp::Session;
use amqp::Table;
use ofborg::config;
use ofborg::worker;
use ofborg::tasks;
use amqp::Basic;
use ofborg::easyamqp;
use ofborg::easyamqp::TypedWrappers;
fn main() {
let cfg = config::load(env::args().nth(1).unwrap().as_ref());
ofborg::setup_log();
let mut session = Session::open_url(&cfg.rabbitmq.as_uri()).unwrap();
let mut session = easyamqp::session_from_config(&cfg.rabbitmq).unwrap();
println!("Connected to rabbitmq");
let mut channel = session.open_channel(1).unwrap();
@ -48,21 +48,24 @@ fn main() {
channel
.basic_consume(
.consume(
worker::new(tasks::log_message_collector::LogMessageCollector::new(
PathBuf::from(cfg.log_storage.clone().unwrap().path),
100,
)),
queue_name,
format!("{}-log-collector", cfg.whoami()),
false,
false,
false,
false,
Table::new(),
easyamqp::ConsumeConfig {
queue: queue_name,
consumer_tag: format!("{}-log-collector", cfg.whoami()),
no_local: false,
no_ack: false,
no_wait: false,
exclusive: false,
arguments: None,
},
)
.unwrap();
channel.start_consuming();
println!("Finished consuming?");

View file

@ -6,19 +6,19 @@ use std::env;
use std::time::Duration;
use std::thread;
use amqp::Session;
use ofborg::message::{Pr, Repo};
use ofborg::config;
use ofborg::notifyworker;
use ofborg::tasks::build;
use ofborg::message::buildjob;
use ofborg::easyamqp;
fn main() {
let cfg = config::load(env::args().nth(1).unwrap().as_ref());
ofborg::setup_log();
let mut session = Session::open_url(&cfg.rabbitmq.as_uri()).unwrap();
let mut session = easyamqp::session_from_config(&cfg.rabbitmq).unwrap();
println!("Connected to rabbitmq");
println!("About to open channel #1");

View file

@ -10,9 +10,9 @@ use ofborg::checkout;
use ofborg::stats;
use ofborg::worker;
use amqp::Session;
use amqp::Table;
use amqp::Basic;
use ofborg::easyamqp;
use ofborg::easyamqp::TypedWrappers;
fn main() {
let cfg = config::load(env::args().nth(1).unwrap().as_ref());
@ -22,7 +22,7 @@ fn main() {
println!("Hello, world!");
let mut session = Session::open_url(&cfg.rabbitmq.as_uri()).unwrap();
let mut session = easyamqp::session_from_config(&cfg.rabbitmq).unwrap();
println!("Connected to rabbitmq");
let mut channel = session.open_channel(1).unwrap();
@ -42,15 +42,17 @@ fn main() {
channel.basic_prefetch(1).unwrap();
channel
.basic_consume(
.consume(
worker::new(mrw),
"mass-rebuild-check-jobs",
format!("{}-mass-rebuild-checker", cfg.whoami()).as_ref(),
false,
false,
false,
false,
Table::new(),
easyamqp::ConsumeConfig {
queue: "mass-rebuild-check-jobs".to_owned(),
consumer_tag: format!("{}-mass-rebuild-checker", cfg.whoami()),
no_local: false,
no_ack: false,
no_wait: false,
exclusive: false,
arguments: None,
},
)
.unwrap();

View file

@ -27,7 +27,7 @@ pub struct FeedbackConfig {
pub full_logs: bool,
}
#[derive(Serialize, Deserialize, Debug)]
#[derive(Serialize, Deserialize, Debug, Clone)]
pub struct RabbitMQConfig {
pub ssl: bool,
pub host: String,
@ -107,7 +107,6 @@ impl Config {
}
}
impl RabbitMQConfig {
pub fn as_uri(&self) -> String {
return format!(

237
ofborg/src/easyamqp.rs Normal file
View file

@ -0,0 +1,237 @@
use ofborg;
use ofborg::config::RabbitMQConfig;
use amqp;
use amqp::Basic;
pub struct ConsumeConfig {
/// Specifies the name of the queue to consume from.
pub queue: String,
/// Specifies the identifier for the consumer. The consumer tag is
/// local to a channel, so two clients can use the same consumer
/// tags. If this field is empty the server will generate a unique
/// tag.
///
/// The client MUST NOT specify a tag that refers to an existing
/// consumer. Error code: not-allowed
pub consumer_tag: String,
/// If the no-local field is set the server will not send messages
/// to the connection that published them.
pub no_local: bool,
/// If this field is set the server does not expect
/// acknowledgements for messages. That is, when a message is
/// delivered to the client the server assumes the delivery will
/// succeed and immediately dequeues it. This functionality may
/// increase performance but at the cost of reliability. Messages
/// can get lost if a client dies before they are delivered to the
/// application.
pub no_ack: bool,
/// Request exclusive consumer access, meaning only this consumer
/// can access the queue.
///
/// The client MAY NOT gain exclusive access to a queue that
/// already has active consumers. Error code: access-refused
pub exclusive: bool,
/// If set, the server will not respond to the method. The client
/// should not wait for a reply method. If the server could not
/// complete the method it will raise a channel or connection
/// exception.
pub no_wait: bool,
/// A set of arguments for the consume. The syntax and semantics
/// of these arguments depends on the server implementation.
pub arguments: Option<amqp::Table>,
}
pub enum ExchangeType {
Topic,
Headers,
Fanout,
Direct,
Custom(String),
}
impl Into<String> for ExchangeType {
fn into(self) -> String {
match self {
ExchangeType::Topic => "topic".to_owned(),
ExchangeType::Headers => "headers".to_owned(),
ExchangeType::Fanout => "fanout".to_owned(),
ExchangeType::Direct => "direct".to_owned(),
ExchangeType::Custom(x) => x,
}
}
}
pub struct ExchangeConfig {
/// Exchange names starting with "amq." are reserved for
/// pre-declared and standardised exchanges. The client MAY
/// declare an exchange starting with "amq." if the passive option
/// is set, or the exchange already exists. Error code:
/// access-refused
///
/// The exchange name consists of a non-empty sequence of these
/// characters: letters, digits, hyphen, underscore, period, or
/// colon. Error code: precondition-failed
exchange: String,
/// Each exchange belongs to one of a set of exchange types
/// implemented by the server. The exchange types define the
/// functionality of the exchange - i.e. how messages are routed
/// through it. It is not valid or meaningful to attempt to change
/// the type of an existing exchange.
///
/// Exchanges cannot be redeclared with different types. The
/// client MUST not attempt to redeclare an existing exchange with
/// a different type than used in the original Exchange.Declare
/// method. Error code: not-allowed
///
/// The client MUST NOT attempt to declare an exchange with a type
/// that the server does not support. Error code: command-invalid
exchange_type: ExchangeType,
/// If set, the server will reply with Declare-Ok if the exchange
/// already exists with the same name, and raise an error if not.
/// The client can use this to check whether an exchange exists
/// without modifying the server state. When set, all other method
/// fields except name and no-wait are ignored. A declare with
/// both passive and no-wait has no effect. Arguments are compared
/// for semantic equivalence.
///
/// If set, and the exchange does not already exist, the server
/// MUST raise a channel exception with reply code 404 (not
/// found).
///
/// If not set and the exchange exists, the server MUST check that
/// the existing exchange has the same values for type, durable,
/// and arguments fields. The server MUST respond with Declare-Ok
/// if the requested exchange matches these fields, and MUST raise
/// a channel exception if not.
passive: bool,
/// If set when creating a new exchange, the exchange will be
/// marked as durable. Durable exchanges remain active when a
/// server restarts. Non-durable exchanges (transient exchanges)
/// are purged if/when a server restarts.
///
/// The server MUST support both durable and transient exchanges.
durable: bool,
/// If set, the exchange is deleted when all queues have finished
/// using it.
///
/// The server SHOULD allow for a reasonable delay between the
/// point when it determines that an exchange is not being used
/// (or no longer used), and the point when it deletes the
/// exchange. At the least it must allow a client to create an
/// exchange and then bind a queue to it, with a small but
/// non-zero delay between these two actions.
///
/// The server MUST ignore the auto-delete field if the exchange
/// already exists.
auto_delete: bool,
/// If set, the exchange may not be used directly by publishers,
/// but only when bound to other exchanges. Internal exchanges are
/// used to construct wiring that is not visible to applications.
internal: bool,
/// If set, the server will not respond to the method. The client
/// should not wait for a reply method. If the server could not
/// complete the method it will raise a channel or connection
/// exception.
nowait: bool,
/// A set of arguments for the declaration. The syntax and
/// semantics of these arguments depends on the server
/// implementation.
arguments: Option<amqp::Table>,
}
pub fn session_from_config(config: &RabbitMQConfig) -> Result<amqp::Session, amqp::AMQPError> {
let scheme = if config.ssl {
amqp::AMQPScheme::AMQPS
} else {
amqp::AMQPScheme::AMQP
};
let mut properties = amqp::Table::new();
properties.insert(
"ofborg_version".to_owned(),
amqp::TableEntry::LongString(ofborg::VERSION.to_owned()),
);
let options = amqp::Options {
host: config.host.clone(),
port: match scheme {
amqp::AMQPScheme::AMQPS => 5671,
amqp::AMQPScheme::AMQP => 5672,
},
vhost: "/".to_owned(),
login: config.username.clone(),
password: config.password.clone(),
scheme: scheme,
properties: properties,
..amqp::Options::default()
};
let session = try!(amqp::Session::new(options));
info!("Connected to {}", &config.host);
return Ok(session);
}
pub trait TypedWrappers {
fn consume<T>(&mut self, callback: T, config: ConsumeConfig) -> Result<String, amqp::AMQPError>
where
T: amqp::Consumer + 'static;
fn declare_exchange<T>(
&mut self,
config: ExchangeConfig,
) -> Result<amqp::protocol::exchange::DeclareOk, amqp::AMQPError>
where
T: amqp::Consumer + 'static;
}
impl TypedWrappers for amqp::Channel {
fn consume<T>(&mut self, callback: T, config: ConsumeConfig) -> Result<String, amqp::AMQPError>
where
T: amqp::Consumer + 'static,
{
self.basic_consume(
callback,
config.queue,
config.consumer_tag,
config.no_local,
config.no_ack,
config.exclusive,
config.no_wait,
config.arguments.unwrap_or(amqp::Table::new()),
)
}
fn declare_exchange<T>(
&mut self,
config: ExchangeConfig,
) -> Result<amqp::protocol::exchange::DeclareOk, amqp::AMQPError>
where
T: amqp::Consumer + 'static,
{
self.exchange_declare(
config.exchange,
config.exchange_type.into(),
config.passive,
config.durable,
config.auto_delete,
config.internal,
config.nowait,
config.arguments.unwrap_or(amqp::Table::new()),
)
}
}

View file

@ -40,6 +40,7 @@ pub mod asynccmd;
pub mod notifyworker;
pub mod writetoline;
pub mod test_scratch;
pub mod easyamqp;
pub mod ofborg {
pub use asynccmd;
@ -62,8 +63,9 @@ pub mod ofborg {
pub use tagger;
pub use writetoline;
pub use test_scratch;
pub use easyamqp;
pub const VERSION: &'static str = env!("CARGO_PKG_VERSION");
}
pub fn setup_log() {

View file

@ -53,15 +53,14 @@ impl worker::SimpleWorker for GitHubCommentWorker {
return vec![worker::Action::Ack];
}
let build_destinations: Vec<(Option<String>,Option<String>)>;
let build_destinations: Vec<(Option<String>, Option<String>)>;
if self.acl.can_build_unrestricted(
&job.comment.user.login,
&job.repository.full_name,
) {
build_destinations = vec![
(Some("build-jobs".to_owned()), None)
];
)
{
build_destinations = vec![(Some("build-jobs".to_owned()), None)];
} else if self.acl.can_build_restricted(
&job.comment.user.login,
&job.repository.full_name,
@ -140,11 +139,7 @@ impl worker::SimpleWorker for GitHubCommentWorker {
};
for (exch, rk) in build_destinations.clone() {
response.push(worker::publish_serde_action(
exch,
rk,
&msg,
));
response.push(worker::publish_serde_action(exch, rk, &msg));
}
}
commentparser::Instruction::Eval => {

View file

@ -129,7 +129,7 @@ try {
}
if (!isset($input->repository)) {
throw new\ExecutionFailureException('Dataset does not have a repository');
throw new ExecutionFailureException('Dataset does not have a repository');
}
if (!isset($input->repository->full_name)) {