feat: make the AMQP based event streamer

This is inspired from the debug listener but pipes things into the
exchange with `abc` as a routing key (should be fixed!!).

Signed-off-by: Raito Bezarius <masterancpp@gmail.com>
This commit is contained in:
raito 2024-11-14 22:51:33 +01:00
parent 42546e5bc7
commit f68e332c1f

View file

@ -6,8 +6,11 @@ use std::error::Error;
use async_std::stream::StreamExt;
use async_std::task;
use futures_util::TryStreamExt;
use futures::pin_mut;
use lapin::options::BasicPublishOptions;
use lapin::BasicProperties;
use ofborg::vcs::gerrit::ssh::GerritSSHApi;
use ofborg::worker::prepare_queue_message;
use tracing::info;
use ofborg::config;
@ -19,7 +22,7 @@ fn main() -> Result<(), Box<dyn Error>> {
let arg = env::args()
.nth(1)
.expect("usage: gerrit-events-filter <config>");
.expect("usage: gerrit-events-streamer <config>");
let cfg = config::load(arg.as_ref());
let conn = easylapin::from_config(&cfg.rabbitmq)?;
@ -42,16 +45,54 @@ fn main() -> Result<(), Box<dyn Error>> {
let gerrit_cfg = cfg
.gerrit
.expect("Gerrit event streamer requires Gerrit configuration");
let gerrit_api = GerritSSHApi::new(
let mut gerrit_api = GerritSSHApi::new(
gerrit_cfg.ssh_private_key_file,
&format!("ssh://{}:{}", gerrit_cfg.instance_uri, gerrit_cfg.ssh_port),
);
let routing_key = "abc";
task::block_on(async {
while let Some(event) = gerrit_api.stream_events().await?.try_next().await? {
//chan.basic_publish();
// publish the event in the exchange!
todo!();
let event_stream = gerrit_api.stream_events().await.unwrap();
pin_mut!(event_stream);
loop {
let raw_evt = event_stream.next().await;
tracing::debug!("{:?}", raw_evt);
match raw_evt {
Some(Ok(event)) => {
println!("{:#?}", event);
let queue_message =
prepare_queue_message(Some(exchange_name), Some(routing_key), &event);
let props = BasicProperties::default()
.with_delivery_mode(2)
.with_content_type("application/json".into());
match chan
.basic_publish(
exchange_name,
routing_key,
BasicPublishOptions::default(),
&queue_message.content,
props,
)
.await
{
Ok(_confirmation) => {
tracing::debug!("Gerrit event published in the exchange");
}
Err(err) => {
tracing::error!("Failed to publish gerrit event: {}", err);
}
}
}
Some(Err(_err)) => {
// notify the event
}
None => {
// notify the event
}
}
}
});