339 lines
12 KiB
Rust
339 lines
12 KiB
Rust
|
use std::{collections::HashMap, path::PathBuf};
|
||
|
use anyhow::Context;
|
||
|
use bytes::Bytes;
|
||
|
use futures::Stream;
|
||
|
use indicatif::ProgressBar;
|
||
|
use nix_compat::store_path::{StorePath, StorePathRef};
|
||
|
use regex::Regex;
|
||
|
|
||
|
use serde::{de::Error, Deserialize, Deserializer};
|
||
|
use tokio::io::AsyncWrite;
|
||
|
|
||
|
use crate::{actions::HydraProductType, config::MirrorConfig};
|
||
|
|
||
|
pub type ReleaseId = u64;
|
||
|
pub type EvaluationId = u64;
|
||
|
pub type BuildId = u64;
|
||
|
|
||
|
fn deser_bool_as_int<'de, D>(deserializer: D) -> Result<bool, D::Error>
|
||
|
where D: Deserializer<'de>
|
||
|
{
|
||
|
u8::deserialize(deserializer).map(|b| if b == 1 { true } else { false })
|
||
|
}
|
||
|
|
||
|
fn deser_bool_as_string<'de, D>(deserializer: D) -> Result<bool, D::Error>
|
||
|
where D: Deserializer<'de>
|
||
|
{
|
||
|
let s: &str = Deserialize::deserialize(deserializer)?;
|
||
|
|
||
|
match s {
|
||
|
"false" => Ok(false),
|
||
|
"true" => Ok(true),
|
||
|
_ => Err(Error::unknown_variant(s, &["false", "true"]))
|
||
|
}
|
||
|
}
|
||
|
|
||
|
#[derive(Debug, Deserialize)]
|
||
|
pub struct OutputPath {
|
||
|
#[serde(rename="path")]
|
||
|
store_path: String
|
||
|
}
|
||
|
|
||
|
#[derive(Debug, Deserialize)]
|
||
|
pub struct BuildProduct {
|
||
|
#[serde(rename="path")]
|
||
|
store_path: String,
|
||
|
name: String,
|
||
|
subtype: String,
|
||
|
sha256hash: String,
|
||
|
r#type: String,
|
||
|
#[serde(rename="filesize")]
|
||
|
file_size: u64
|
||
|
}
|
||
|
|
||
|
#[derive(Debug, Deserialize)]
|
||
|
pub struct BuildInfo {
|
||
|
jobset: String,
|
||
|
job: String,
|
||
|
project: String,
|
||
|
#[serde(deserialize_with = "deser_bool_as_int", rename="buildstatus")]
|
||
|
finished: bool,
|
||
|
#[serde(rename="releasename")]
|
||
|
release_name: Option<String>,
|
||
|
#[serde(rename="stoptime")]
|
||
|
stop_time: u64,
|
||
|
#[serde(rename="starttime")]
|
||
|
start_time: u64,
|
||
|
system: String,
|
||
|
id: BuildId,
|
||
|
#[serde(rename="buildoutputs")]
|
||
|
build_outputs: HashMap<String, OutputPath>,
|
||
|
#[serde(rename="buildproducts")]
|
||
|
build_products: HashMap<String, BuildProduct>,
|
||
|
#[serde(rename="nixname")]
|
||
|
nix_name: String,
|
||
|
#[serde(rename="drvpath")]
|
||
|
drv_path: String,
|
||
|
}
|
||
|
|
||
|
#[derive(Debug, Clone)]
|
||
|
pub struct Channel {
|
||
|
pub name: String
|
||
|
}
|
||
|
|
||
|
#[non_exhaustive]
|
||
|
pub enum ChannelType {
|
||
|
NixOS,
|
||
|
Nixpkgs,
|
||
|
}
|
||
|
|
||
|
#[non_exhaustive]
|
||
|
pub enum ChannelVariant {
|
||
|
Normal,
|
||
|
Small
|
||
|
}
|
||
|
|
||
|
impl Channel {
|
||
|
pub fn version(&self) -> String {
|
||
|
let re = Regex::new("([a-z]+)-(?<ver>.*)").unwrap();
|
||
|
let caps = re.captures(&self.name).expect("Failed to parse the channel name");
|
||
|
|
||
|
caps["ver"].to_string()
|
||
|
}
|
||
|
|
||
|
pub fn prefix(&self) -> String {
|
||
|
let re = Regex::new("(?<name>[a-z]+)-(.*)").unwrap();
|
||
|
let caps = re.captures(&self.name).expect("Failed to parse the channel name");
|
||
|
|
||
|
caps["name"].to_string()
|
||
|
}
|
||
|
|
||
|
// TODO: just regex match?
|
||
|
pub fn r#type(&self) -> ChannelType {
|
||
|
ChannelType::NixOS
|
||
|
}
|
||
|
|
||
|
pub fn variant(&self) -> ChannelVariant {
|
||
|
ChannelVariant::Normal
|
||
|
}
|
||
|
}
|
||
|
|
||
|
#[derive(Deserialize, Debug)]
|
||
|
pub struct Release {
|
||
|
pub id: ReleaseId,
|
||
|
job: String,
|
||
|
#[serde(rename = "releasename")]
|
||
|
release_name: Option<String>,
|
||
|
#[serde(rename = "starttime")]
|
||
|
start_time: u64,
|
||
|
#[serde(rename = "stoptime")]
|
||
|
stop_time: u64,
|
||
|
#[serde(rename = "nixname")]
|
||
|
pub nix_name: String,
|
||
|
#[serde(rename = "jobsetevals")]
|
||
|
jobset_evals: Vec<EvaluationId>,
|
||
|
jobset: String,
|
||
|
#[serde(deserialize_with = "deser_bool_as_int")]
|
||
|
finished: bool,
|
||
|
priority: u64,
|
||
|
system: String,
|
||
|
timestamp: u64,
|
||
|
project: String,
|
||
|
#[serde(rename = "drvpath")]
|
||
|
derivation_path: String,
|
||
|
// ignored: buildproducts, buildoutputs, buildmetrics, buildstatus
|
||
|
}
|
||
|
|
||
|
#[derive(Debug)]
|
||
|
pub struct GitInput {
|
||
|
uri: String,
|
||
|
revision: String
|
||
|
}
|
||
|
|
||
|
// FIXME(Raito): for some reason, #[serde(tag = "type"), rename_all = "lowercase"] doesn't behave
|
||
|
// correctly, and causes deserialization failures in practice. `untagged` is suboptimal but works
|
||
|
// because of the way responses works...
|
||
|
#[derive(Debug, Deserialize)]
|
||
|
#[serde(untagged, expecting = "An valid jobset input")]
|
||
|
pub enum Input {
|
||
|
Boolean {
|
||
|
#[serde(deserialize_with = "deser_bool_as_string")]
|
||
|
value: bool
|
||
|
},
|
||
|
Git { uri: String, revision: String },
|
||
|
Nix { value: String },
|
||
|
}
|
||
|
|
||
|
#[derive(Deserialize, Debug)]
|
||
|
pub struct Evaluation {
|
||
|
pub id: EvaluationId,
|
||
|
#[serde(rename="checkouttime")]
|
||
|
checkout_time: u64,
|
||
|
#[serde(rename="evaltime")]
|
||
|
eval_time: u64,
|
||
|
flake: Option<String>,
|
||
|
#[serde(rename="jobsetevalinputs", default)]
|
||
|
pub jobset_eval_inputs: HashMap<String, Input>,
|
||
|
timestamp: u64,
|
||
|
builds: Vec<BuildId>,
|
||
|
}
|
||
|
|
||
|
impl Release {
|
||
|
pub fn version(&self) -> String {
|
||
|
let re = Regex::new(".+-(?<ver>[0-9].+)").unwrap();
|
||
|
let caps = re.captures(&self.nix_name).expect("Failed to parse the release name");
|
||
|
|
||
|
caps["ver"].to_string()
|
||
|
}
|
||
|
|
||
|
pub fn evaluation_id(&self) -> u64 {
|
||
|
*self.jobset_evals.first().expect("Failed to obtain the corresponding evaluation, malformed release?")
|
||
|
}
|
||
|
|
||
|
pub fn evaluation_url(&self, hydra_base_uri: &str) -> String {
|
||
|
let eval_id = self.evaluation_id();
|
||
|
format!("{}/eval/{}", hydra_base_uri, eval_id)
|
||
|
}
|
||
|
|
||
|
pub fn job_url(&self, hydra_base_uri: &str, job_name: &str) -> String {
|
||
|
let eval_id = self.evaluation_id();
|
||
|
format!("{}/eval/{}/job/{}", hydra_base_uri, eval_id, job_name)
|
||
|
}
|
||
|
|
||
|
pub fn store_paths_url(&self, hydra_base_uri: &str) -> String {
|
||
|
let eval_id = self.evaluation_id();
|
||
|
format!("{}/eval/{}/store-paths", hydra_base_uri, eval_id)
|
||
|
}
|
||
|
|
||
|
/// Directory related to this release.
|
||
|
fn directory(&self, channel: &Channel) -> String {
|
||
|
match channel.name.as_str() {
|
||
|
"nixpkgs-unstable" => "nixpkgs".to_string(),
|
||
|
_ => format!("{}/{}", channel.prefix(), channel.version())
|
||
|
}
|
||
|
}
|
||
|
|
||
|
pub fn prefix(&self, channel: &Channel) -> object_store::path::Path {
|
||
|
format!("{}/{}", self.directory(channel), self.nix_name).into()
|
||
|
}
|
||
|
}
|
||
|
|
||
|
pub fn release_uri(hydra_uri: &str, job_name: &str) -> String {
|
||
|
format!("{}/job/{}/latest", hydra_uri, job_name)
|
||
|
}
|
||
|
|
||
|
#[derive(Debug)]
|
||
|
pub struct HydraClient<'a> {
|
||
|
pub config: &'a MirrorConfig,
|
||
|
}
|
||
|
|
||
|
impl HydraClient<'_> {
|
||
|
pub async fn fetch_release(&self, job_name: &str) -> reqwest::Result<Release> {
|
||
|
let client = reqwest::Client::new();
|
||
|
let resp = client.get(release_uri(&self.config.hydra_uri, job_name))
|
||
|
.header("Accept", "application/json")
|
||
|
// TODO: put a proper version
|
||
|
.header("User-Agent", "nixos-channel-scripts (rust)")
|
||
|
.send()
|
||
|
.await?;
|
||
|
|
||
|
resp.json().await
|
||
|
}
|
||
|
|
||
|
pub async fn fetch_evaluation(&self, release: &Release) -> reqwest::Result<Evaluation> {
|
||
|
let client = reqwest::Client::new();
|
||
|
let resp = client.get(release.evaluation_url(&self.config.hydra_uri))
|
||
|
.header("Accept", "application/json")
|
||
|
.header("User-Agent", "nixos-channel-scripts (rust)")
|
||
|
.send()
|
||
|
.await?;
|
||
|
|
||
|
resp.json().await
|
||
|
}
|
||
|
|
||
|
pub async fn fetch_store_paths(&self, release: &Release) -> anyhow::Result<Vec<StorePath<String>>> {
|
||
|
let client = reqwest::Client::new();
|
||
|
|
||
|
Ok(client.get(release.store_paths_url(&self.config.hydra_uri))
|
||
|
.header("Accept", "application/json")
|
||
|
.header("User-Agent", "nixos-channel-scripts (rust)")
|
||
|
.send()
|
||
|
.await?
|
||
|
.json()
|
||
|
.await
|
||
|
.context(format!("while downloading store-paths information for an evaluation"))?
|
||
|
)
|
||
|
}
|
||
|
|
||
|
pub async fn hydra_product_bytes_stream(&self, release: &Release, job_name: &str, product_type: Option<HydraProductType>) -> anyhow::Result<(u64, impl Stream<Item = std::io::Result<Bytes>>)> {
|
||
|
let client = reqwest::Client::new();
|
||
|
let build_info: BuildInfo = client.get(release.job_url(&self.config.hydra_uri, job_name))
|
||
|
.header("Accept", "application/json")
|
||
|
.header("User-Agent", "nixos-channel-scripts (rust)")
|
||
|
.send()
|
||
|
.await?
|
||
|
.json()
|
||
|
.await
|
||
|
.context(format!("while downloading build information from {}", release.job_url(&self.config.hydra_uri, job_name)))?;
|
||
|
|
||
|
let mut products_by_subtype: HashMap<String, BuildProduct> = HashMap::new();
|
||
|
for (_, product) in build_info.build_products {
|
||
|
if products_by_subtype.contains_key(&product.subtype) {
|
||
|
todo!("Job {} has multiple products of the same subtype {:?}. This is not supported yet.", job_name, product_type);
|
||
|
}
|
||
|
|
||
|
products_by_subtype.insert(product.subtype.clone(), product);
|
||
|
}
|
||
|
|
||
|
if products_by_subtype.len() > 1 && product_type.is_none() {
|
||
|
panic!("Job {} has {} build products. Select the right product via the subtypes.", job_name, products_by_subtype.len());
|
||
|
}
|
||
|
|
||
|
let product: BuildProduct = if let Some(ptype) = product_type {
|
||
|
products_by_subtype.remove(&ptype.to_string()).expect(&format!("Expected product type '{}' but not found in the list of products", &ptype.to_string()))
|
||
|
} else {
|
||
|
products_by_subtype.into_iter().last().expect(&format!("Expected at least one build product in job {}, found zero", job_name)).1
|
||
|
};
|
||
|
|
||
|
// 3. FIXME: validate sha256hash during read?
|
||
|
let (store_path, rel_path) = StorePathRef::from_absolute_path_full(&product.store_path).expect("Failed to parse the product's store path");
|
||
|
crate::nar::file_in_nar_bytes_stream(&self.config.binary_cache_uri, &nix_compat::nixbase32::encode(store_path.digest()), rel_path.to_str().unwrap()).await.context("while copying the NAR to the target")
|
||
|
}
|
||
|
|
||
|
pub async fn copy_hydra_product<W: AsyncWrite + Unpin>(&self, release: &Release, job_name: &str, product_type: Option<HydraProductType>, pbar: ProgressBar, out: &mut W) -> anyhow::Result<u64> {
|
||
|
// TODO: dry me?
|
||
|
let client = reqwest::Client::new();
|
||
|
let build_info: BuildInfo = client.get(release.job_url(&self.config.hydra_uri, job_name))
|
||
|
.header("Accept", "application/json")
|
||
|
.header("User-Agent", "nixos-channel-scripts (rust)")
|
||
|
.send()
|
||
|
.await?
|
||
|
.json()
|
||
|
.await
|
||
|
.context(format!("while downloading build information from {}", release.job_url(&self.config.hydra_uri, job_name)))?;
|
||
|
|
||
|
let mut products_by_subtype: HashMap<String, BuildProduct> = HashMap::new();
|
||
|
for (_, product) in build_info.build_products {
|
||
|
if products_by_subtype.contains_key(&product.subtype) {
|
||
|
todo!("Job {} has multiple products of the same subtype {:?}. This is not supported yet.", job_name, product_type);
|
||
|
}
|
||
|
|
||
|
products_by_subtype.insert(product.subtype.clone(), product);
|
||
|
}
|
||
|
|
||
|
if products_by_subtype.len() > 1 && product_type.is_none() {
|
||
|
panic!("Job {} has {} build products. Select the right product via the subtypes.", job_name, products_by_subtype.len());
|
||
|
}
|
||
|
|
||
|
let product: BuildProduct = if let Some(ptype) = product_type {
|
||
|
products_by_subtype.remove(&ptype.to_string()).expect(&format!("Expected product type '{}' but not found in the list of products", &ptype.to_string()))
|
||
|
} else {
|
||
|
products_by_subtype.into_iter().last().expect(&format!("Expected at least one build product in job {}, found zero", job_name)).1
|
||
|
};
|
||
|
|
||
|
// 3. FIXME: validate sha256hash during read?
|
||
|
let (store_path, rel_path) = StorePathRef::from_absolute_path_full(&product.store_path).expect("Failed to parse the product's store path");
|
||
|
crate::nar::copy_file_in_nar(&self.config.binary_cache_uri, &nix_compat::nixbase32::encode(store_path.digest()), rel_path.to_str().unwrap(), pbar, out).await.context("while copying the NAR to the target")
|
||
|
}
|
||
|
}
|