Replace the plastic heartbeats with proper heartbeat support

This commit is contained in:
Graham Christensen 2018-01-28 21:38:47 -05:00
parent d5ac9e6bc7
commit 9e829b4a2e
No known key found for this signature in database
GPG key ID: ACA1C1D120C83D5C
9 changed files with 6 additions and 170 deletions

View file

@ -18,10 +18,7 @@ use ofborg::notifyworker::NotificationReceiver;
use ofborg::commentparser; use ofborg::commentparser;
use ofborg::message::buildjob; use ofborg::message::buildjob;
use ofborg::message::{Pr, Repo}; use ofborg::message::{Pr, Repo};
use ofborg::tasks;
fn main() { fn main() {
let cfg = config::load(env::args().nth(1).unwrap().as_ref()); let cfg = config::load(env::args().nth(1).unwrap().as_ref());
@ -32,18 +29,9 @@ fn main() {
let mut session = Session::open_url(&cfg.rabbitmq.as_uri()).unwrap(); let mut session = Session::open_url(&cfg.rabbitmq.as_uri()).unwrap();
println!("Connected to rabbitmq"); println!("Connected to rabbitmq");
{
println!("About to open channel #1");
let hbchan = session.open_channel(1).unwrap();
println!("Opened channel #1");
tasks::heartbeat::start_on_channel(hbchan, cfg.whoami());
}
let mut channel = session.open_channel(2).unwrap(); let mut channel = session.open_channel(1).unwrap();
let repo_msg = Repo { let repo_msg = Repo {
clone_url: "https://github.com/nixos/ofborg.git".to_owned(), clone_url: "https://github.com/nixos/ofborg.git".to_owned(),
@ -72,7 +60,7 @@ fn main() {
{ {
let mut recv = notifyworker::ChannelNotificationReceiver::new(&mut channel, 0); let mut recv = notifyworker::ChannelNotificationReceiver::new(&mut channel, 0);
for i in 1..2 { for _i in 1..2 {
recv.tell(worker::publish_serde_action( recv.tell(worker::publish_serde_action(
None, None,
Some("build-inputs-x86_64-darwin".to_owned()), Some("build-inputs-x86_64-darwin".to_owned()),

View file

@ -28,16 +28,8 @@ fn main() {
let mut session = Session::open_url(&cfg.rabbitmq.as_uri()).unwrap(); let mut session = Session::open_url(&cfg.rabbitmq.as_uri()).unwrap();
println!("Connected to rabbitmq"); println!("Connected to rabbitmq");
{
println!("About to open channel #1");
let hbchan = session.open_channel(1).unwrap();
println!("Opened channel #1"); let mut channel = session.open_channel(1).unwrap();
tasks::heartbeat::start_on_channel(hbchan, cfg.whoami());
}
let mut channel = session.open_channel(2).unwrap();
let cloner = checkout::cached_cloner(Path::new(&cfg.checkout.root)); let cloner = checkout::cached_cloner(Path::new(&cfg.checkout.root));
let nix = cfg.nix(); let nix = cfg.nix();

View file

@ -27,17 +27,8 @@ fn main() {
let mut session = Session::open_url(&cfg.rabbitmq.as_uri()).unwrap(); let mut session = Session::open_url(&cfg.rabbitmq.as_uri()).unwrap();
println!("Connected to rabbitmq"); println!("Connected to rabbitmq");
{
println!("About to open channel #1");
let hbchan = session.open_channel(1).unwrap();
println!("Opened channel #1"); let mut channel = session.open_channel(1).unwrap();
tasks::heartbeat::start_on_channel(hbchan, cfg.whoami());
}
let mut channel = session.open_channel(2).unwrap();
channel.basic_prefetch(1).unwrap(); channel.basic_prefetch(1).unwrap();
channel channel

View file

@ -20,16 +20,8 @@ fn main() {
let mut session = Session::open_url(&cfg.rabbitmq.as_uri()).unwrap(); let mut session = Session::open_url(&cfg.rabbitmq.as_uri()).unwrap();
println!("Connected to rabbitmq"); println!("Connected to rabbitmq");
{
println!("About to open channel #1");
let hbchan = session.open_channel(1).unwrap();
println!("Opened channel #1"); let mut channel = session.open_channel(1).unwrap();
tasks::heartbeat::start_on_channel(hbchan, cfg.whoami());
}
let mut channel = session.open_channel(2).unwrap();
let queue_name = channel let queue_name = channel
.queue_declare( .queue_declare(

View file

@ -24,16 +24,8 @@ fn main() {
let mut session = Session::open_url(&cfg.rabbitmq.as_uri()).unwrap(); let mut session = Session::open_url(&cfg.rabbitmq.as_uri()).unwrap();
println!("Connected to rabbitmq"); println!("Connected to rabbitmq");
{
println!("About to open channel #1");
let hbchan = session.open_channel(1).unwrap();
println!("Opened channel #1"); let mut channel = session.open_channel(1).unwrap();
tasks::heartbeat::start_on_channel(hbchan, cfg.whoami());
}
let mut channel = session.open_channel(2).unwrap();
let cloner = checkout::cached_cloner(Path::new(&cfg.checkout.root)); let cloner = checkout::cached_cloner(Path::new(&cfg.checkout.root));
let nix = cfg.nix(); let nix = cfg.nix();

View file

@ -2,7 +2,6 @@ mod common;
pub mod buildjob; pub mod buildjob;
pub mod buildresult; pub mod buildresult;
pub mod massrebuildjob; pub mod massrebuildjob;
pub mod plasticheartbeat;
pub mod buildlogmsg; pub mod buildlogmsg;
pub use self::common::{Pr, Repo}; pub use self::common::{Pr, Repo};

View file

@ -1,11 +0,0 @@
extern crate amqp;
extern crate env_logger;
use serde_json;
#[derive(Serialize, Deserialize, Debug)]
pub struct PlasticHeartbeat {}
pub fn from(data: &Vec<u8>) -> Result<PlasticHeartbeat, serde_json::error::Error> {
return serde_json::from_slice(&data);
}

View file

@ -1,106 +0,0 @@
use std::{thread, time};
use serde_json;
use ofborg::worker;
use ofborg::message::plasticheartbeat;
use amqp::Channel;
use amqp::Table;
use amqp::protocol::basic::{Deliver, BasicProperties};
use std::process;
use amqp::Basic;
struct PlasticHeartbeatWorker {
queue_name: String,
}
impl PlasticHeartbeatWorker {
fn message(&self) -> worker::QueueMsg {
return worker::QueueMsg {
exchange: None,
routing_key: Some(self.queue_name.clone()),
mandatory: false,
immediate: false,
properties: None,
content: serde_json::to_string(&plasticheartbeat::PlasticHeartbeat {})
.unwrap()
.into_bytes(),
};
}
}
impl worker::SimpleWorker for PlasticHeartbeatWorker {
type J = plasticheartbeat::PlasticHeartbeat;
fn msg_to_job(
&mut self,
_: &Deliver,
_: &BasicProperties,
body: &Vec<u8>,
) -> Result<Self::J, String> {
return match plasticheartbeat::from(body) {
Ok(e) => Ok(e),
Err(e) => {
println!("{:?}", String::from_utf8(body.clone()));
panic!("{:?}", e);
}
};
}
fn consumer(&mut self, _job: &plasticheartbeat::PlasticHeartbeat) -> worker::Actions {
thread::sleep(time::Duration::from_secs(5));
return vec![worker::Action::Publish(self.message()), worker::Action::Ack];
}
}
pub fn start_on_channel(mut hbchan: Channel, consumer_name: String) {
let queue_name = hbchan
.queue_declare(
"",
false, // passive
false, // durable
true, // exclusive
true, // auto_delete
false, //nowait
Table::new(),
)
.expect("Failed to declare an anon queue for PlasticHeartbeats!")
.queue;
println!("Got personal queue: {:?}", queue_name);
hbchan
.basic_publish(
"",
queue_name.as_ref(),
true, // mandatory
false, // immediate
BasicProperties { ..Default::default() },
serde_json::to_string(&plasticheartbeat::PlasticHeartbeat {})
.unwrap()
.into_bytes(),
)
.unwrap();
let worker = move || {
hbchan
.basic_consume(
worker::new(PlasticHeartbeatWorker { queue_name: (&queue_name).clone() }),
queue_name,
String::from(format!("{}-heartbeat", consumer_name)),
false,
false,
false,
false,
Table::new(),
)
.unwrap();
hbchan.start_consuming();
println!("PlasticHeartbeat failed");
process::exit(1);
};
thread::spawn(worker);
}

View file

@ -1,5 +1,4 @@
pub mod heartbeat;
pub mod build; pub mod build;
pub mod massrebuilder; pub mod massrebuilder;
pub mod githubcommentfilter; pub mod githubcommentfilter;