debugging...

This commit is contained in:
Graham Christensen 2017-11-10 18:38:02 -05:00
parent d25727e649
commit e865b9164f
No known key found for this signature in database
GPG key ID: ACA1C1D120C83D5C
2 changed files with 98 additions and 5 deletions

View file

@ -2,6 +2,7 @@ extern crate ofborg;
extern crate amqp; extern crate amqp;
extern crate env_logger; extern crate env_logger;
use std::{thread, time};
use std::collections::LinkedList; use std::collections::LinkedList;
use std::env; use std::env;
use amqp::protocol::basic::{Deliver,BasicProperties}; use amqp::protocol::basic::{Deliver,BasicProperties};
@ -9,6 +10,7 @@ use amqp::protocol::basic::{Deliver,BasicProperties};
use std::path::Path; use std::path::Path;
use amqp::Basic; use amqp::Basic;
use amqp::Session; use amqp::Session;
use amqp::Channel;
use amqp::Table; use amqp::Table;
use std::process; use std::process;
use std::io::Error; use std::io::Error;
@ -22,6 +24,7 @@ use ofborg::worker;
use ofborg::message::buildjob; use ofborg::message::buildjob;
use ofborg::nix; use ofborg::nix;
fn main() { fn main() {
let cfg = config::load(env::args().nth(1).unwrap().as_ref()); let cfg = config::load(env::args().nth(1).unwrap().as_ref());
env_logger::init().unwrap(); env_logger::init().unwrap();
@ -29,7 +32,76 @@ fn main() {
let mut session = Session::open_url(&cfg.rabbitmq.as_uri()).unwrap(); let mut session = Session::open_url(&cfg.rabbitmq.as_uri()).unwrap();
let mut channel = session.open_channel(1).unwrap(); println!("Connected to rabbitmq");
{
println!("About to open channel #1");
let mut hbchan = session.open_channel(1).unwrap();
println!("Opened channel #1");
//queue: &str, passive, durable, exclusive, auto_delete, nowait, arguments: Table
match hbchan.queue_declare("", false, false, true, true, false, Table::new()) {
Err(problem) => {
println!("Failed to declare a queue: {:?}", problem);
process::exit(1);
}
Ok(resp) => {
println!("Got personal queue: {:?}", resp);
let queueName = resp.queue;
hbchan.basic_publish(
"",
queueName.as_ref(),
true,
false,
BasicProperties {
content_type: Some("text".to_owned()),
..Default::default()
},
(b"Hello from rust!").to_vec()
).unwrap();
let qnameC = queueName.clone();
thread::spawn(move || {
hbchan.basic_consume(
move |chan: &mut Channel, deliver: Deliver, headers: BasicProperties, data: Vec<u8>|
{
chan.basic_publish(
"",
qnameC.as_ref(),
true,
false,
BasicProperties {
content_type: Some("text".to_owned()),
..Default::default()
},
(b"Hello from rust!").to_vec()
).unwrap();
let ten_sec = time::Duration::from_secs(1);
thread::sleep(ten_sec);
println!("Got a plastic heartbeat <3");
chan.basic_ack(deliver.delivery_tag, false).unwrap();
},
queueName,
String::from("hbchanner1"),
false,
false,
false,
false,
Table::new()
);
hbchan.start_consuming();
println!("Erm... in the heartbaets");
process::exit(1);
});
}
}
}
let mut channel = session.open_channel(2).unwrap();
//queue: &str, passive: bool, durable: bool, exclusive: bool, auto_delete: bool, nowait: bool, arguments: Table //queue: &str, passive: bool, durable: bool, exclusive: bool, auto_delete: bool, nowait: bool, arguments: Table
if let Err(problem) = channel.queue_declare("my_queue_name", false, true, false, false, false, Table::new()) { if let Err(problem) = channel.queue_declare("my_queue_name", false, true, false, false, false, Table::new()) {
@ -55,10 +127,21 @@ fn main() {
Table::new() Table::new()
); );
let ten_sec = time::Duration::from_secs(10);
thread::sleep(ten_sec);
channel.start_consuming(); channel.start_consuming();
println!("Finished consuming?");
channel.close(200, "Bye").unwrap(); channel.close(200, "Bye").unwrap();
println!("Closed the channel");
session.close(200, "Good Bye"); session.close(200, "Good Bye");
println!("Closed the session... EOF");
} }

View file

@ -55,28 +55,38 @@ impl <T: SimpleWorker + Send> Consumer for Worker<T> {
headers: BasicProperties, headers: BasicProperties,
body: Vec<u8>) { body: Vec<u8>) {
let job = self.internal.msg_to_job(&method, &headers, &body).unwrap(); let job = self.internal.msg_to_job(&method, &headers, &body).unwrap();
for action in self.internal.consumer(&job) { for action in self.internal.consumer(&job) {
match action { match action {
Action::Ack => { Action::Ack => {
println!("Ack");
channel.basic_ack(method.delivery_tag, false).unwrap(); channel.basic_ack(method.delivery_tag, false).unwrap();
} }
Action::NackRequeue => { Action::NackRequeue => {
println!("Nack Requeue");
channel.basic_nack(method.delivery_tag, false, true).unwrap(); channel.basic_nack(method.delivery_tag, false, true).unwrap();
} }
Action::NackDump => { Action::NackDump => {
println!("Nack Dump");
channel.basic_nack(method.delivery_tag, false, false).unwrap(); channel.basic_nack(method.delivery_tag, false, false).unwrap();
} }
Action::Publish(msg) => { Action::Publish(msg) => {
let exch = msg.exchange.clone().unwrap_or("".to_owned());
let key = msg.routing_key.clone().unwrap_or("".to_owned());
println!("Publishing, {:?} -> {:?}: {:?}", exch, key, msg);
let props = msg.properties.unwrap_or(BasicProperties{ ..Default::default()}); let props = msg.properties.unwrap_or(BasicProperties{ ..Default::default()});
channel.basic_publish( println!("{:?}", channel.basic_publish(
msg.exchange.unwrap_or("".to_owned()), exch,
msg.routing_key.unwrap_or("".to_owned()), key,
msg.mandatory, msg.mandatory,
msg.immediate, msg.immediate,
props, props,
msg.content msg.content
).unwrap(); ).unwrap());
} }
} }
} }