feat(statcheck): introduce status & checks server
This is a basic server that returns mocked data. Next steps are persistence, client support in the rest of the code, etc. Signed-off-by: Raito Bezarius <masterancpp@gmail.com>
This commit is contained in:
parent
14e79573d3
commit
4b0cf86ef7
8 changed files with 239 additions and 85 deletions
12
Cargo.lock
generated
12
Cargo.lock
generated
|
@ -387,6 +387,7 @@ checksum = "edca88bc138befd0323b20752846e6587272d3b03b0343c8ea28a6f819e6e71f"
|
|||
dependencies = [
|
||||
"async-trait",
|
||||
"axum-core",
|
||||
"axum-macros",
|
||||
"bytes",
|
||||
"futures-util",
|
||||
"http 1.2.0",
|
||||
|
@ -434,6 +435,17 @@ dependencies = [
|
|||
"tracing",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "axum-macros"
|
||||
version = "0.4.2"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "57d123550fa8d071b7255cb0cc04dc302baa6c8c4a79f55701552684d8399bce"
|
||||
dependencies = [
|
||||
"proc-macro2",
|
||||
"quote",
|
||||
"syn",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "backtrace"
|
||||
version = "0.3.74"
|
||||
|
|
|
@ -11,7 +11,7 @@ edition = "2018"
|
|||
[dependencies]
|
||||
async-stream = "0.3.6"
|
||||
async-trait = "0.1.83"
|
||||
axum = "0.7.8"
|
||||
axum = { version = "0.7.8", features = ["macros"] }
|
||||
base64 = "0.22.1"
|
||||
brace-expand = "0.1.0"
|
||||
chrono = "0.4.38"
|
||||
|
|
119
ofborg/src/bin/statcheck-web.rs
Normal file
119
ofborg/src/bin/statcheck-web.rs
Normal file
|
@ -0,0 +1,119 @@
|
|||
use axum::{
|
||||
extract::State,
|
||||
routing::{get, put},
|
||||
Json, Router,
|
||||
};
|
||||
use ofborg::config::Config;
|
||||
use ofborg::web::statcheck;
|
||||
use serde::Serialize;
|
||||
use std::{env, net::SocketAddr, os::unix::io::FromRawFd, sync::Arc};
|
||||
use tokio::net::TcpListener;
|
||||
|
||||
// --- Entry point ---
|
||||
#[tokio::main]
|
||||
async fn main() {
|
||||
ofborg::setup_log();
|
||||
|
||||
let arg = env::args()
|
||||
.nth(1)
|
||||
.expect("usage: evaluation-filter <config>");
|
||||
let cfg = ofborg::config::load(arg.as_ref());
|
||||
|
||||
let shared_config = Arc::new(cfg);
|
||||
|
||||
// Build the app router
|
||||
let app = Router::new()
|
||||
.route("/health", get(health_check))
|
||||
.route("/config", get(config_check))
|
||||
.route(
|
||||
"/changes/:change_id/statuses",
|
||||
get(statcheck::get_statuses).put(statcheck::put_statuses),
|
||||
)
|
||||
.route(
|
||||
"/changes/:change_id/statuses/:status_id",
|
||||
get(statcheck::get_status).patch(statcheck::patch_status),
|
||||
)
|
||||
.route(
|
||||
"/changes/:change_id/checks/:check_id",
|
||||
get(statcheck::get_check).patch(statcheck::patch_check),
|
||||
)
|
||||
.route("/changes/:change_id/checks", put(statcheck::put_checks))
|
||||
.with_state(shared_config);
|
||||
|
||||
// Check for systemd socket activation
|
||||
if let Some(listener) = get_systemd_listener() {
|
||||
tracing::info!("Running with systemd socket activation");
|
||||
axum::serve(listener, app.into_make_service())
|
||||
.await
|
||||
.expect("Failed to serve");
|
||||
} else {
|
||||
// Fallback to manual address for testing
|
||||
let host = env::var("HOST").unwrap_or_else(|_| "127.0.0.1".to_string());
|
||||
let port = env::var("PORT")
|
||||
.unwrap_or_else(|_| "8000".to_string())
|
||||
.parse::<u16>()
|
||||
.expect("Invalid port number");
|
||||
|
||||
let addr = SocketAddr::new(host.parse().expect("Invalid host"), port);
|
||||
tracing::info!("Running on http://{}", addr);
|
||||
|
||||
let listener = TcpListener::bind(addr)
|
||||
.await
|
||||
.expect("Failed to bind on the provided socket address");
|
||||
|
||||
axum::serve(listener, app)
|
||||
.await
|
||||
.expect("Failed to bind server");
|
||||
}
|
||||
}
|
||||
|
||||
// --- Route Handlers ---
|
||||
|
||||
#[derive(Serialize)]
|
||||
struct HealthStatus {
|
||||
status: String,
|
||||
}
|
||||
|
||||
/// Health check endpoint
|
||||
async fn health_check() -> Json<HealthStatus> {
|
||||
Json(HealthStatus {
|
||||
status: "OK".to_string(),
|
||||
})
|
||||
}
|
||||
|
||||
#[derive(Serialize)]
|
||||
struct ConfigStatus {
|
||||
version: String,
|
||||
environment: String,
|
||||
gerrit_instance: Option<String>,
|
||||
// TODO: add ongoing_statuses as a simple counter?
|
||||
}
|
||||
|
||||
/// Config endpoint
|
||||
async fn config_check(State(config): State<Arc<Config>>) -> Json<ConfigStatus> {
|
||||
Json(ConfigStatus {
|
||||
version: env!("CARGO_PKG_VERSION").to_string(),
|
||||
environment: "production".to_string(),
|
||||
gerrit_instance: config.gerrit.as_ref().map(|g| g.instance_uri.clone()),
|
||||
})
|
||||
}
|
||||
|
||||
/// Try to retrieve a listener from systemd socket activation
|
||||
fn get_systemd_listener() -> Option<tokio::net::TcpListener> {
|
||||
if let Ok(listen_fds) = env::var("LISTEN_FDS") {
|
||||
let listen_fds: i32 = listen_fds.parse().ok()?;
|
||||
let fd_offset = 3; // File descriptors start at 3 in systemd
|
||||
if listen_fds > 0 {
|
||||
// Use the first systemd-provided file descriptor
|
||||
let fd = fd_offset;
|
||||
println!("Using systemd file descriptor: {}", fd);
|
||||
unsafe {
|
||||
let std_listener = std::net::TcpListener::from_raw_fd(fd);
|
||||
std_listener.set_nonblocking(true).ok()?;
|
||||
let listener = TcpListener::from_std(std_listener).ok()?;
|
||||
return Some(listener);
|
||||
}
|
||||
}
|
||||
}
|
||||
None
|
||||
}
|
|
@ -1,83 +0,0 @@
|
|||
/// Statuses and checks worker
|
||||
/// - will keep a database of changes
|
||||
/// - their statuses
|
||||
/// - their checks
|
||||
/// - is VCS/CI agnostic
|
||||
use std::env;
|
||||
use std::error::Error;
|
||||
|
||||
use ofborg::config;
|
||||
use ofborg::easyamqp;
|
||||
use ofborg::easyamqp::ChannelExt;
|
||||
use ofborg::easyamqp::ConsumerExt;
|
||||
use ofborg::easylapin;
|
||||
use ofborg::tasks;
|
||||
use tracing::info;
|
||||
|
||||
#[tokio::main]
|
||||
async fn main() -> Result<(), Box<dyn Error>> {
|
||||
ofborg::setup_log();
|
||||
|
||||
let arg = env::args()
|
||||
.nth(1)
|
||||
.expect("usage: statcheck-worker <config>");
|
||||
let cfg = config::load(arg.as_ref());
|
||||
|
||||
let conn = easylapin::from_config(&cfg.rabbitmq).await?;
|
||||
let mut chan = conn.create_channel().await?;
|
||||
|
||||
// an RPC queue for verbs
|
||||
let api_queue_name = "statcheck-api".to_owned();
|
||||
// an event queue to be notified about statuses & checks changes.
|
||||
let event_queue_name = "statcheck-events".to_owned();
|
||||
|
||||
chan.declare_exchange(easyamqp::ExchangeConfig {
|
||||
exchange: api_queue_name.clone(),
|
||||
exchange_type: easyamqp::ExchangeType::Topic,
|
||||
passive: false,
|
||||
durable: true,
|
||||
auto_delete: false,
|
||||
no_wait: false,
|
||||
internal: false,
|
||||
})
|
||||
.await?;
|
||||
|
||||
chan.declare_queue(easyamqp::QueueConfig {
|
||||
queue: api_queue_name.clone(),
|
||||
passive: false,
|
||||
durable: true,
|
||||
exclusive: false,
|
||||
auto_delete: false,
|
||||
no_wait: false,
|
||||
})
|
||||
.await?;
|
||||
|
||||
chan.bind_queue(easyamqp::BindQueueConfig {
|
||||
queue: api_queue_name.clone(),
|
||||
exchange: api_queue_name.clone(),
|
||||
routing_key: None,
|
||||
no_wait: false,
|
||||
})
|
||||
.await?;
|
||||
|
||||
info!("Waiting for API calls on {}", api_queue_name);
|
||||
info!("Notifying of new changes on {}", event_queue_name);
|
||||
|
||||
easylapin::WorkerChannel(chan)
|
||||
.consume(
|
||||
tasks::status_check_collector::StatusCheckCollector::new(cfg.statcheck.clone().db),
|
||||
easyamqp::ConsumeConfig {
|
||||
queue: api_queue_name.clone(),
|
||||
consumer_tag: format!("{}-{}", cfg.whoami(), api_queue_name),
|
||||
no_local: false,
|
||||
no_ack: false,
|
||||
no_wait: false,
|
||||
exclusive: false,
|
||||
},
|
||||
)
|
||||
.await?;
|
||||
|
||||
drop(conn); // Close connection.
|
||||
info!("Closed the session... EOF");
|
||||
Ok(())
|
||||
}
|
|
@ -49,6 +49,7 @@ pub mod tasks;
|
|||
pub mod test_scratch;
|
||||
pub mod utils;
|
||||
pub mod vcs;
|
||||
pub mod web;
|
||||
pub mod worker;
|
||||
pub mod writetoline;
|
||||
|
||||
|
@ -74,6 +75,7 @@ pub mod ofborg {
|
|||
pub use crate::tasks;
|
||||
pub use crate::test_scratch;
|
||||
pub use crate::vcs;
|
||||
pub use crate::web;
|
||||
pub use crate::worker;
|
||||
pub use crate::writetoline;
|
||||
|
||||
|
|
|
@ -114,7 +114,7 @@ pub enum Conclusion {
|
|||
ActionRequired,
|
||||
}
|
||||
|
||||
#[derive(Debug, Serialize, PartialEq)]
|
||||
#[derive(Debug, Serialize, Deserialize, PartialEq)]
|
||||
pub struct CheckRunOptions {
|
||||
pub name: String,
|
||||
pub head_sha: String,
|
||||
|
|
1
ofborg/src/web/mod.rs
Normal file
1
ofborg/src/web/mod.rs
Normal file
|
@ -0,0 +1 @@
|
|||
pub mod statcheck;
|
103
ofborg/src/web/statcheck/mod.rs
Normal file
103
ofborg/src/web/statcheck/mod.rs
Normal file
|
@ -0,0 +1,103 @@
|
|||
use axum::{extract::Path, Json};
|
||||
|
||||
use crate::{
|
||||
message::Repo,
|
||||
vcs::generic::{CheckRunOptions, CheckRunState},
|
||||
};
|
||||
|
||||
/// This contains the web code for the status & checks server.
|
||||
|
||||
// TODO: how to do code reuse with the other structure that contains an API handle?
|
||||
#[allow(dead_code)]
|
||||
#[derive(Serialize, Deserialize, Debug, Clone)]
|
||||
pub struct CommitStatus {
|
||||
repo: Repo,
|
||||
sha: String,
|
||||
context: String,
|
||||
description: String,
|
||||
url: String,
|
||||
}
|
||||
|
||||
/// Handler for GET /changes/:change_id/statuses
|
||||
pub async fn get_statuses(Path(_change_id): Path<String>) -> Json<Vec<CommitStatus>> {
|
||||
// TODO: Retrieve the statuses from the data store
|
||||
Json(vec![]) // Return an empty list for now
|
||||
}
|
||||
|
||||
/// Handler for GET /changes/:change_id/statuses/:status_id
|
||||
pub async fn get_status(
|
||||
Path((_change_id, _status_id)): Path<(String, String)>,
|
||||
) -> Json<CommitStatus> {
|
||||
// TODO: Retrieve a specific status from the data store
|
||||
Json(CommitStatus {
|
||||
repo: Repo {
|
||||
owner: "example".to_string(),
|
||||
name: "repo".to_string(),
|
||||
full_name: "example/repo".to_string(),
|
||||
clone_url: "https://example.com/repo.git".to_string(),
|
||||
},
|
||||
sha: "example_sha".to_string(),
|
||||
context: "example_context".to_string(),
|
||||
description: "example_description".to_string(),
|
||||
url: "https://example.com/status".to_string(),
|
||||
})
|
||||
}
|
||||
|
||||
/// Handler for PUT /changes/:change_id/statuses
|
||||
pub async fn put_statuses(
|
||||
Path(change_id): Path<String>,
|
||||
Json(_payload): Json<CommitStatus>,
|
||||
) -> Json<String> {
|
||||
// TODO: Add the status to the data store
|
||||
Json(format!("Added status for change_id {}", change_id))
|
||||
}
|
||||
|
||||
/// Handler for PATCH /changes/:change_id/statuses/:status_id
|
||||
pub async fn patch_status(
|
||||
Path((change_id, status_id)): Path<(String, String)>,
|
||||
Json(_payload): Json<CommitStatus>,
|
||||
) -> Json<String> {
|
||||
// TODO: Update the status in the data store
|
||||
Json(format!(
|
||||
"Updated status_id {} for change_id {}",
|
||||
status_id, change_id
|
||||
))
|
||||
}
|
||||
|
||||
/// Handler for GET /changes/:change_id/checks/:check_id
|
||||
pub async fn get_check(
|
||||
Path((_change_id, _check_id)): Path<(String, String)>,
|
||||
) -> Json<CheckRunOptions> {
|
||||
// TODO: Retrieve a specific check from the data store
|
||||
Json(CheckRunOptions {
|
||||
name: "example_check".to_string(),
|
||||
head_sha: "example_sha".to_string(),
|
||||
details_url: Some("https://example.com/details".to_string()),
|
||||
external_id: Some("external_id".to_string()),
|
||||
status: Some(CheckRunState::Running),
|
||||
started_at: Some("2024-12-17T00:00:00Z".to_string()),
|
||||
conclusion: None,
|
||||
completed_at: None,
|
||||
})
|
||||
}
|
||||
|
||||
/// Handler for PUT /changes/:change_id/checks
|
||||
pub async fn put_checks(
|
||||
Path(change_id): Path<String>,
|
||||
Json(_payload): Json<CheckRunOptions>,
|
||||
) -> Json<String> {
|
||||
// TODO: Add the check to the data store
|
||||
Json(format!("Added check for change_id {}", change_id))
|
||||
}
|
||||
|
||||
/// Handler for PATCH /changes/:change_id/checks/:check_id
|
||||
pub async fn patch_check(
|
||||
Path((change_id, check_id)): Path<(String, String)>,
|
||||
Json(_payload): Json<CheckRunOptions>,
|
||||
) -> Json<String> {
|
||||
// TODO: Update the check in the data store
|
||||
Json(format!(
|
||||
"Updated check_id {} for change_id {}",
|
||||
check_id, change_id
|
||||
))
|
||||
}
|
Loading…
Reference in a new issue