feat(*): init Rust port
This is a Rust port of the original Perl script, legacy cruft is removed and it focuses on a modern Hydra deployment. Nonetheless, it knows how to perform migrations based on the channel versions. Signed-off-by: Raito Bezarius <masterancpp@gmail.com>
This commit is contained in:
parent
809d960f49
commit
65358e64c0
5
.gitignore
vendored
5
.gitignore
vendored
|
@ -1 +1,6 @@
|
||||||
result
|
result
|
||||||
|
|
||||||
|
|
||||||
|
# Added by cargo
|
||||||
|
|
||||||
|
/target
|
||||||
|
|
2663
Cargo.lock
generated
Normal file
2663
Cargo.lock
generated
Normal file
File diff suppressed because it is too large
Load diff
24
Cargo.toml
Normal file
24
Cargo.toml
Normal file
|
@ -0,0 +1,24 @@
|
||||||
|
[package]
|
||||||
|
name = "nixos-channel-scripts"
|
||||||
|
version = "0.1.0"
|
||||||
|
edition = "2021"
|
||||||
|
|
||||||
|
[dependencies]
|
||||||
|
clap = { version = "4.5.13", features = [ "derive" ] }
|
||||||
|
indicatif = { version = "0.17.8", features = ["futures", "tokio"] }
|
||||||
|
log = "0.4.22"
|
||||||
|
object_store = { version = "0.10.2", features = [ "aws" ] }
|
||||||
|
pretty_env_logger = "0.5.0"
|
||||||
|
regex = "1.10.6"
|
||||||
|
reqwest = { version = "0.12.5", features = ["brotli"] }
|
||||||
|
serde = "1.0.204"
|
||||||
|
tempdir = "0.3.7"
|
||||||
|
textwrap = "0.16.1"
|
||||||
|
tokio = { version = "1.39.2", features = ["full"] }
|
||||||
|
toml = "0.8.19"
|
||||||
|
nix-compat = { git = "https://github.com/tvlfyi/tvix" }
|
||||||
|
futures = "0.3.30"
|
||||||
|
anyhow = "1.0.86"
|
||||||
|
bytes = "1.7.1"
|
||||||
|
async-compression = { version = "0.4.12", features = ["tokio", "zstd"] }
|
||||||
|
tokio-util = { version = "0.7.11", features = ["io"] }
|
14
default.nix
14
default.nix
|
@ -1,3 +1,11 @@
|
||||||
(import (fetchTarball https://github.com/edolstra/flake-compat/archive/master.tar.gz) {
|
{ pkgs ? import <nixpkgs> {} }:
|
||||||
src = builtins.fetchGit ./.;
|
{
|
||||||
}).defaultNix
|
shell = pkgs.mkShell {
|
||||||
|
buildInputs = [
|
||||||
|
pkgs.cargo
|
||||||
|
pkgs.rustc
|
||||||
|
pkgs.openssl
|
||||||
|
pkgs.pkg-config
|
||||||
|
];
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
5
forkos.toml
Normal file
5
forkos.toml
Normal file
|
@ -0,0 +1,5 @@
|
||||||
|
hydra_uri = "https://hydra.forkos.org"
|
||||||
|
binary_cache_uri = "https://cache.forkos.org"
|
||||||
|
nixpkgs_dir = "/var/lib/nixpkgs"
|
||||||
|
s3_release_bucket_uri = "s3://cache.forkos.org/release"
|
||||||
|
s3_channel_bucket_uri = "s3://cache.forkos.org/channel"
|
|
@ -1,3 +0,0 @@
|
||||||
(import (fetchTarball https://github.com/edolstra/flake-compat/archive/master.tar.gz) {
|
|
||||||
src = builtins.fetchGit ./.;
|
|
||||||
}).shellNix
|
|
257
src/actions.rs
Normal file
257
src/actions.rs
Normal file
|
@ -0,0 +1,257 @@
|
||||||
|
use std::path::PathBuf;
|
||||||
|
use anyhow::Context;
|
||||||
|
use async_compression::tokio::write::ZstdEncoder;
|
||||||
|
use futures::future::join_all;
|
||||||
|
use indicatif::MultiProgress;
|
||||||
|
use indicatif::ProgressBar;
|
||||||
|
use indicatif::ProgressDrawTarget;
|
||||||
|
use log::debug;
|
||||||
|
use tempdir::TempDir;
|
||||||
|
use tokio::fs::File;
|
||||||
|
use crate::hydra;
|
||||||
|
use crate::config;
|
||||||
|
|
||||||
|
#[derive(Debug, Copy, Clone)]
|
||||||
|
pub enum HydraProductType {
|
||||||
|
BrotliJson,
|
||||||
|
SourceDistribution,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug)]
|
||||||
|
pub enum Compression {
|
||||||
|
Zstd,
|
||||||
|
Brotli,
|
||||||
|
None
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Compression {
|
||||||
|
pub fn http_encoding(&self) -> Option<&str> {
|
||||||
|
Some(match self {
|
||||||
|
Self::Zstd => "zstd",
|
||||||
|
Self::Brotli => "br",
|
||||||
|
Self::None => return None,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn file_extension(&self) -> Option<&str> {
|
||||||
|
Some(match self {
|
||||||
|
Self::Zstd => "zst",
|
||||||
|
Self::Brotli => "br",
|
||||||
|
Self::None => return None,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl ToString for HydraProductType {
|
||||||
|
fn to_string(&self) -> String {
|
||||||
|
match self {
|
||||||
|
Self::BrotliJson => "json-br".to_string(),
|
||||||
|
Self::SourceDistribution => "source-dist".to_string(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug)]
|
||||||
|
pub enum Action {
|
||||||
|
WriteFile {
|
||||||
|
dst_path: PathBuf,
|
||||||
|
contents: String
|
||||||
|
},
|
||||||
|
WriteHydraProduct {
|
||||||
|
product_name: String,
|
||||||
|
dst_path: Option<PathBuf>,
|
||||||
|
product_type: Option<HydraProductType>,
|
||||||
|
},
|
||||||
|
WriteCompressedStorePaths {
|
||||||
|
compression: Compression
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug)]
|
||||||
|
pub struct Plan {
|
||||||
|
pub actions: Vec<Action>,
|
||||||
|
pub release: hydra::Release,
|
||||||
|
pub channel_name: String,
|
||||||
|
pub target_bucket: String
|
||||||
|
}
|
||||||
|
|
||||||
|
macro_rules! plan_write_file {
|
||||||
|
($plan_container:ident, $dst_path:literal, $contents:expr) => {
|
||||||
|
$plan_container.push(Action::WriteFile {
|
||||||
|
dst_path: $dst_path.into(),
|
||||||
|
contents: $contents
|
||||||
|
});
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
macro_rules! plan_write_hydra_product {
|
||||||
|
($plan_container:ident, $pname:expr) => {
|
||||||
|
$plan_container.push(Action::WriteHydraProduct {
|
||||||
|
product_name: $pname.into(),
|
||||||
|
dst_path: None,
|
||||||
|
product_type: None
|
||||||
|
});
|
||||||
|
};
|
||||||
|
|
||||||
|
($plan_container:ident, $dst_path:literal, $pname:expr) => {
|
||||||
|
$plan_container.push(Action::WriteHydraProduct {
|
||||||
|
dst_path: Some($dst_path.into()),
|
||||||
|
product_name: $pname.into(),
|
||||||
|
product_type: None
|
||||||
|
});
|
||||||
|
};
|
||||||
|
|
||||||
|
($plan_container:ident, $dst_path:literal, $pname:expr, $ptype:expr) => {
|
||||||
|
$plan_container.push(Action::WriteHydraProduct {
|
||||||
|
dst_path: Some($dst_path.into()),
|
||||||
|
product_name: $pname.into(),
|
||||||
|
product_type: Some($ptype)
|
||||||
|
});
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
macro_rules! plan_iso {
|
||||||
|
($plan_container:ident, $iso_name:literal, $arch:literal) => {
|
||||||
|
plan_write_hydra_product!($plan_container, format!("nixos.iso_{}.{}", $iso_name, $arch));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn generate_plan(config: &config::MirrorConfig, channel: &hydra::Channel, release: &hydra::Release, evaluation: hydra::Evaluation) -> Vec<Action> {
|
||||||
|
let mut plan = vec![];
|
||||||
|
|
||||||
|
/*plan.push(Action::WriteCompressedStorePaths {
|
||||||
|
compression: Compression::Zstd
|
||||||
|
});*/
|
||||||
|
|
||||||
|
if let hydra::Input::Git { revision, .. } = evaluation.jobset_eval_inputs.get("nixpkgs").unwrap() {
|
||||||
|
plan_write_file!(plan, "src-url", release.evaluation_url(&config.hydra_uri));
|
||||||
|
plan_write_file!(plan, "git-revision", revision.clone());
|
||||||
|
plan_write_file!(plan, "binary-cache-url", config.binary_cache_uri.clone());
|
||||||
|
|
||||||
|
if matches!(channel.r#type(), hydra::ChannelType::NixOS) {
|
||||||
|
plan_write_hydra_product!(plan, "nixexprs.tar.xz", "nixos.channel");
|
||||||
|
plan_write_hydra_product!(plan, "packages.json.br", "nixpkgs.tarball",
|
||||||
|
HydraProductType::BrotliJson
|
||||||
|
);
|
||||||
|
plan_write_hydra_product!(plan, "options.json.br", "nixos.options",
|
||||||
|
HydraProductType::BrotliJson
|
||||||
|
);
|
||||||
|
|
||||||
|
// TODO: we don't have aarch64-linux yet
|
||||||
|
//plan_iso!(plan, "minimal", "aarch64-linux");
|
||||||
|
plan_iso!(plan, "minimal", "x86_64-linux");
|
||||||
|
|
||||||
|
if !matches!(channel.variant(), hydra::ChannelVariant::Small) {
|
||||||
|
// TODO: for some reason, those ISOs are not present in ForkOS hydra.
|
||||||
|
// TODO: implement error recovery, skipping or failure?
|
||||||
|
// maybe encode this in the plan (fallible, etc.)
|
||||||
|
// TODO: compat with nixos-20, 21, 22, 23 using plasma5?
|
||||||
|
// should we have plasma_latest?
|
||||||
|
//plan_iso!(plan, "plasma6", "aarch64-linux");
|
||||||
|
//plan_iso!(plan, "plasma6", "x86_64-linux");
|
||||||
|
|
||||||
|
//plan_iso!(plan, "gnome", "aarch64-linux");
|
||||||
|
//plan_iso!(plan, "gnome", "x86_64-linux");
|
||||||
|
|
||||||
|
// TODO: i686-linux ISO compat? for nixos ≤ 23.11
|
||||||
|
|
||||||
|
// FIXME: weird OVA appliance here, should investigate what to do
|
||||||
|
//plan_write_hydra_product!(plan, "nixos.ova.x86_64-linux");
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
plan_write_hydra_product!(plan, "nixexprs.tar.xz", "tarball",
|
||||||
|
HydraProductType::SourceDistribution);
|
||||||
|
plan_write_hydra_product!(plan, "packages.json.br", "tarball",
|
||||||
|
HydraProductType::BrotliJson);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// TODO: take into account the SQLite database & debuginfo
|
||||||
|
|
||||||
|
plan
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Interets the plan as a list of action.
|
||||||
|
///
|
||||||
|
/// It creates a staging directory where all actions are prepared.
|
||||||
|
/// No irreversible action is committed.
|
||||||
|
///
|
||||||
|
/// At the end, it returns the staging directory for further post-processing, including possible
|
||||||
|
/// uploads.
|
||||||
|
///
|
||||||
|
/// Enabling streaming will also directly stream the contents of the staging directory to the
|
||||||
|
/// target bucket in a temporary suffix of the target prefix.
|
||||||
|
/// Caller is responsible to promote that upload to a finalized upload.
|
||||||
|
pub async fn interpret_plan(hydra_client: &hydra::HydraClient<'_>, plan: Plan, streaming: bool) -> TempDir {
|
||||||
|
if streaming {
|
||||||
|
// TODO: streaming impl is just replacing the async write to go directly to the bucket
|
||||||
|
// returning a tempdir is useless now as all the contents is already remote?
|
||||||
|
// what should become the return type?
|
||||||
|
todo!("Streaming the plan is not supported yet");
|
||||||
|
}
|
||||||
|
|
||||||
|
// Prepare a staging directory
|
||||||
|
let staging_directory = TempDir::new(format!("staging-{}", &plan.channel_name).as_str()).expect("Failed to create temporary directory for staging directory");
|
||||||
|
debug!("Preparing the update in {}", staging_directory.path().display());
|
||||||
|
|
||||||
|
// Initialize multiple progress bars
|
||||||
|
let multi_pbar = MultiProgress::with_draw_target(ProgressDrawTarget::stdout());
|
||||||
|
let mut pending = Vec::new();
|
||||||
|
|
||||||
|
for action in plan.actions {
|
||||||
|
let pbar = ProgressBar::new(0);
|
||||||
|
let pbar = multi_pbar.add(pbar.clone());
|
||||||
|
let task = async {
|
||||||
|
match action {
|
||||||
|
Action::WriteHydraProduct { product_name, dst_path, product_type } => {
|
||||||
|
// spawn two pbs with bytes styles, attach it to the mb
|
||||||
|
// perform an io copy from the async read of the remote download of hydra to the
|
||||||
|
// async write of the IO file behind dst_path
|
||||||
|
// and wrap_async_read + wrap_async_write
|
||||||
|
let dst_path = dst_path.unwrap_or_else(|| product_name.clone().into());
|
||||||
|
assert!(dst_path.is_relative(), "Unexpected absolute path in a channel script plan, all paths needs to be relative in order for them to be prefixed by the staging directory");
|
||||||
|
let mut tgt_file = tokio::fs::File::create(staging_directory.path().join(dst_path.clone())).await.unwrap();
|
||||||
|
let copied = hydra_client.copy_hydra_product(&plan.release, &product_name, product_type, pbar, &mut tgt_file).await
|
||||||
|
.context(format!("while writing the hydra product {} ({:?}) to {}", product_name, product_type, dst_path.display()))
|
||||||
|
.expect("Failed to copy an Hydra product");
|
||||||
|
|
||||||
|
debug!("Copied {} bytes", copied);
|
||||||
|
},
|
||||||
|
Action::WriteFile { dst_path, contents } => {
|
||||||
|
// We don't need a progress bar here.
|
||||||
|
multi_pbar.remove(&pbar);
|
||||||
|
assert!(dst_path.is_relative(), "Unexpected absolute path in a channel script plan, all paths needs to be relative in order for them to be prefixed by the staging directory");
|
||||||
|
let size = contents.len();
|
||||||
|
tokio::fs::write(staging_directory.path().join(dst_path), contents).await.expect("Failed to write a file");
|
||||||
|
debug!("Written {} bytes", size);
|
||||||
|
},
|
||||||
|
Action::WriteCompressedStorePaths { compression } => {
|
||||||
|
// spawn a pb with remote download from hydra
|
||||||
|
// spawn a pb with write + compression to some compressing writer
|
||||||
|
let mut writer;
|
||||||
|
let tgt_file = tokio::fs::File::create(staging_directory.path().join("store-paths.zst")).await.unwrap();
|
||||||
|
match compression {
|
||||||
|
Compression::Zstd => {
|
||||||
|
writer = ZstdEncoder::new(tgt_file);
|
||||||
|
},
|
||||||
|
Compression::Brotli => todo!(),
|
||||||
|
Compression::None => todo!(),
|
||||||
|
}
|
||||||
|
let written = hydra_client.copy_hydra_product(&plan.release, "store-paths", None, pbar, &mut writer).await
|
||||||
|
.context("while writing compressed store paths")
|
||||||
|
.expect("Failed to write compressed store paths from Hydra");
|
||||||
|
debug!("Written {} bytes of compressed store paths", written);
|
||||||
|
},
|
||||||
|
};
|
||||||
|
};
|
||||||
|
|
||||||
|
pending.push(task);
|
||||||
|
}
|
||||||
|
|
||||||
|
join_all(pending).await;
|
||||||
|
|
||||||
|
// join all the futures
|
||||||
|
// returns the staging directory
|
||||||
|
staging_directory
|
||||||
|
}
|
||||||
|
|
33
src/config.rs
Normal file
33
src/config.rs
Normal file
|
@ -0,0 +1,33 @@
|
||||||
|
use object_store::aws::{AmazonS3, AmazonS3Builder};
|
||||||
|
use serde::Deserialize;
|
||||||
|
use std::path::PathBuf;
|
||||||
|
|
||||||
|
#[derive(Debug, Deserialize)]
|
||||||
|
pub struct MirrorConfig {
|
||||||
|
/// URI to Hydra instance
|
||||||
|
pub hydra_uri: String,
|
||||||
|
/// URI to the binary cache
|
||||||
|
pub binary_cache_uri: String,
|
||||||
|
/// A path to a checkout of nixpkgs
|
||||||
|
nixpkgs_dir: PathBuf,
|
||||||
|
/// S3 releases bucket URL
|
||||||
|
s3_release_bucket_uri: String,
|
||||||
|
/// S3 channels bucket URL
|
||||||
|
s3_channel_bucket_uri: String,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl MirrorConfig {
|
||||||
|
pub fn release_bucket(&self) -> AmazonS3 {
|
||||||
|
AmazonS3Builder::from_env()
|
||||||
|
.with_bucket_name(&self.s3_release_bucket_uri)
|
||||||
|
.build()
|
||||||
|
.expect("Failed to connect to the S3 release bucket")
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn channel_bucket(&self) -> AmazonS3 {
|
||||||
|
AmazonS3Builder::from_env()
|
||||||
|
.with_bucket_name(&self.s3_channel_bucket_uri)
|
||||||
|
.build()
|
||||||
|
.expect("Failed to connect to the S3 channel bucket")
|
||||||
|
}
|
||||||
|
}
|
1
src/git.rs
Normal file
1
src/git.rs
Normal file
|
@ -0,0 +1 @@
|
||||||
|
|
276
src/hydra.rs
Normal file
276
src/hydra.rs
Normal file
|
@ -0,0 +1,276 @@
|
||||||
|
use std::{collections::HashMap, path::PathBuf};
|
||||||
|
use anyhow::Context;
|
||||||
|
use indicatif::ProgressBar;
|
||||||
|
use nix_compat::store_path::StorePathRef;
|
||||||
|
use regex::Regex;
|
||||||
|
|
||||||
|
use serde::{de::Error, Deserialize, Deserializer};
|
||||||
|
use tokio::io::AsyncWrite;
|
||||||
|
|
||||||
|
use crate::{actions::HydraProductType, config::MirrorConfig};
|
||||||
|
|
||||||
|
pub type ReleaseId = u64;
|
||||||
|
pub type EvaluationId = u64;
|
||||||
|
pub type BuildId = u64;
|
||||||
|
|
||||||
|
fn deser_bool_as_int<'de, D>(deserializer: D) -> Result<bool, D::Error>
|
||||||
|
where D: Deserializer<'de>
|
||||||
|
{
|
||||||
|
u8::deserialize(deserializer).map(|b| if b == 1 { true } else { false })
|
||||||
|
}
|
||||||
|
|
||||||
|
fn deser_bool_as_string<'de, D>(deserializer: D) -> Result<bool, D::Error>
|
||||||
|
where D: Deserializer<'de>
|
||||||
|
{
|
||||||
|
let s: &str = Deserialize::deserialize(deserializer)?;
|
||||||
|
|
||||||
|
match s {
|
||||||
|
"false" => Ok(false),
|
||||||
|
"true" => Ok(true),
|
||||||
|
_ => Err(Error::unknown_variant(s, &["false", "true"]))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug, Deserialize)]
|
||||||
|
pub struct OutputPath {
|
||||||
|
#[serde(rename="path")]
|
||||||
|
store_path: String
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug, Deserialize)]
|
||||||
|
pub struct BuildProduct {
|
||||||
|
#[serde(rename="path")]
|
||||||
|
store_path: String,
|
||||||
|
name: String,
|
||||||
|
subtype: String,
|
||||||
|
sha256hash: String,
|
||||||
|
r#type: String,
|
||||||
|
#[serde(rename="filesize")]
|
||||||
|
file_size: u64
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug, Deserialize)]
|
||||||
|
pub struct BuildInfo {
|
||||||
|
jobset: String,
|
||||||
|
job: String,
|
||||||
|
project: String,
|
||||||
|
#[serde(deserialize_with = "deser_bool_as_int", rename="buildstatus")]
|
||||||
|
finished: bool,
|
||||||
|
#[serde(rename="releasename")]
|
||||||
|
release_name: Option<String>,
|
||||||
|
#[serde(rename="stoptime")]
|
||||||
|
stop_time: u64,
|
||||||
|
#[serde(rename="starttime")]
|
||||||
|
start_time: u64,
|
||||||
|
system: String,
|
||||||
|
id: BuildId,
|
||||||
|
#[serde(rename="buildoutputs")]
|
||||||
|
build_outputs: HashMap<String, OutputPath>,
|
||||||
|
#[serde(rename="buildproducts")]
|
||||||
|
build_products: HashMap<String, BuildProduct>,
|
||||||
|
#[serde(rename="nixname")]
|
||||||
|
nix_name: String,
|
||||||
|
#[serde(rename="drvpath")]
|
||||||
|
drv_path: String,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug)]
|
||||||
|
pub struct Channel {
|
||||||
|
pub name: String
|
||||||
|
}
|
||||||
|
|
||||||
|
#[non_exhaustive]
|
||||||
|
pub enum ChannelType {
|
||||||
|
NixOS,
|
||||||
|
Nixpkgs,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[non_exhaustive]
|
||||||
|
pub enum ChannelVariant {
|
||||||
|
Normal,
|
||||||
|
Small
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Channel {
|
||||||
|
pub fn version(&self) -> String {
|
||||||
|
let re = Regex::new("([a-z]+)-(?<ver>.*)").unwrap();
|
||||||
|
let caps = re.captures(&self.name).expect("Failed to parse the channel name");
|
||||||
|
|
||||||
|
caps["ver"].to_string()
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn prefix(&self) -> String {
|
||||||
|
let re = Regex::new("(?<name>[a-z]+)-(.*)").unwrap();
|
||||||
|
let caps = re.captures(&self.name).expect("Failed to parse the channel name");
|
||||||
|
|
||||||
|
caps["name"].to_string()
|
||||||
|
}
|
||||||
|
|
||||||
|
// TODO: just regex match?
|
||||||
|
pub fn r#type(&self) -> ChannelType {
|
||||||
|
ChannelType::NixOS
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn variant(&self) -> ChannelVariant {
|
||||||
|
ChannelVariant::Normal
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Deserialize, Debug)]
|
||||||
|
pub struct Release {
|
||||||
|
pub id: ReleaseId,
|
||||||
|
job: String,
|
||||||
|
#[serde(rename = "releasename")]
|
||||||
|
release_name: Option<String>,
|
||||||
|
#[serde(rename = "starttime")]
|
||||||
|
start_time: u64,
|
||||||
|
#[serde(rename = "stoptime")]
|
||||||
|
stop_time: u64,
|
||||||
|
#[serde(rename = "nixname")]
|
||||||
|
pub nix_name: String,
|
||||||
|
#[serde(rename = "jobsetevals")]
|
||||||
|
jobset_evals: Vec<EvaluationId>,
|
||||||
|
jobset: String,
|
||||||
|
#[serde(deserialize_with = "deser_bool_as_int")]
|
||||||
|
finished: bool,
|
||||||
|
priority: u64,
|
||||||
|
system: String,
|
||||||
|
timestamp: u64,
|
||||||
|
project: String,
|
||||||
|
#[serde(rename = "drvpath")]
|
||||||
|
derivation_path: String,
|
||||||
|
// ignored: buildproducts, buildoutputs, buildmetrics, buildstatus
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug)]
|
||||||
|
pub struct GitInput {
|
||||||
|
uri: String,
|
||||||
|
revision: String
|
||||||
|
}
|
||||||
|
|
||||||
|
// FIXME(Raito): for some reason, #[serde(tag = "type"), rename_all = "lowercase"] doesn't behave
|
||||||
|
// correctly, and causes deserialization failures in practice. `untagged` is suboptimal but works
|
||||||
|
// because of the way responses works...
|
||||||
|
#[derive(Debug, Deserialize)]
|
||||||
|
#[serde(untagged, expecting = "An valid jobset input")]
|
||||||
|
pub enum Input {
|
||||||
|
Boolean {
|
||||||
|
#[serde(deserialize_with = "deser_bool_as_string")]
|
||||||
|
value: bool
|
||||||
|
},
|
||||||
|
Git { uri: String, revision: String },
|
||||||
|
Nix { value: String },
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Deserialize, Debug)]
|
||||||
|
pub struct Evaluation {
|
||||||
|
pub id: EvaluationId,
|
||||||
|
#[serde(rename="checkouttime")]
|
||||||
|
checkout_time: u64,
|
||||||
|
#[serde(rename="evaltime")]
|
||||||
|
eval_time: u64,
|
||||||
|
flake: Option<String>,
|
||||||
|
#[serde(rename="jobsetevalinputs", default)]
|
||||||
|
pub jobset_eval_inputs: HashMap<String, Input>,
|
||||||
|
timestamp: u64,
|
||||||
|
builds: Vec<BuildId>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Release {
|
||||||
|
pub fn version(&self) -> String {
|
||||||
|
let re = Regex::new(".+-(?<ver>[0-9].+)").unwrap();
|
||||||
|
let caps = re.captures(&self.nix_name).expect("Failed to parse the release name");
|
||||||
|
|
||||||
|
caps["ver"].to_string()
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn evaluation_url(&self, hydra_base_uri: &str) -> String {
|
||||||
|
let eval_id = self.jobset_evals.first().expect("Failed to obtain the corresponding evaluation, malformed release?");
|
||||||
|
format!("{}/eval/{}", hydra_base_uri, eval_id)
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn job_url(&self, hydra_base_uri: &str, job_name: &str) -> String {
|
||||||
|
let eval_id = self.jobset_evals.first().expect("Failed to obtain the corresponding evaluation, malformed release?");
|
||||||
|
format!("{}/eval/{}/job/{}", hydra_base_uri, eval_id, job_name)
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Directory related to this release.
|
||||||
|
fn directory(&self, channel: &Channel) -> String {
|
||||||
|
match channel.name.as_str() {
|
||||||
|
"nixpkgs-unstable" => "nixpkgs".to_string(),
|
||||||
|
_ => format!("{}/{}", channel.prefix(), channel.version())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn prefix(&self, channel: &Channel) -> object_store::path::Path {
|
||||||
|
format!("{}/{}", self.directory(channel), self.nix_name).into()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn release_uri(hydra_uri: &str, job_name: &str) -> String {
|
||||||
|
format!("{}/job/{}/latest", hydra_uri, job_name)
|
||||||
|
}
|
||||||
|
|
||||||
|
pub struct HydraClient<'a> {
|
||||||
|
pub config: &'a MirrorConfig,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl HydraClient<'_> {
|
||||||
|
pub async fn fetch_release(&self, job_name: &str) -> reqwest::Result<Release> {
|
||||||
|
let client = reqwest::Client::new();
|
||||||
|
let resp = client.get(release_uri(&self.config.hydra_uri, job_name))
|
||||||
|
.header("Accept", "application/json")
|
||||||
|
// TODO: put a proper version
|
||||||
|
.header("User-Agent", "nixos-channel-scripts (rust)")
|
||||||
|
.send()
|
||||||
|
.await?;
|
||||||
|
|
||||||
|
resp.json().await
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn fetch_evaluation(&self, release: &Release) -> reqwest::Result<Evaluation> {
|
||||||
|
let client = reqwest::Client::new();
|
||||||
|
let resp = client.get(release.evaluation_url(&self.config.hydra_uri))
|
||||||
|
.header("Accept", "application/json")
|
||||||
|
.header("User-Agent", "nixos-channel-scripts (rust)")
|
||||||
|
.send()
|
||||||
|
.await?;
|
||||||
|
|
||||||
|
resp.json().await
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn copy_hydra_product<W: AsyncWrite + Unpin>(&self, release: &Release, job_name: &str, product_type: Option<HydraProductType>, pbar: ProgressBar, out: &mut W) -> anyhow::Result<u64> {
|
||||||
|
let client = reqwest::Client::new();
|
||||||
|
let build_info: BuildInfo = client.get(release.job_url(&self.config.hydra_uri, job_name))
|
||||||
|
.header("Accept", "application/json")
|
||||||
|
.header("User-Agent", "nixos-channel-scripts (rust)")
|
||||||
|
.send()
|
||||||
|
.await?
|
||||||
|
.json()
|
||||||
|
.await
|
||||||
|
.context(format!("while downloading build information from {}", release.job_url(&self.config.hydra_uri, job_name)))?;
|
||||||
|
|
||||||
|
let mut products_by_subtype: HashMap<String, BuildProduct> = HashMap::new();
|
||||||
|
for (_, product) in build_info.build_products {
|
||||||
|
if products_by_subtype.contains_key(&product.subtype) {
|
||||||
|
todo!("Job {} has multiple products of the same subtype {:?}. This is not supported yet.", job_name, product_type);
|
||||||
|
}
|
||||||
|
|
||||||
|
products_by_subtype.insert(product.subtype.clone(), product);
|
||||||
|
}
|
||||||
|
|
||||||
|
if products_by_subtype.len() > 1 && product_type.is_none() {
|
||||||
|
panic!("Job {} has {} build products. Select the right product via the subtypes.", job_name, products_by_subtype.len());
|
||||||
|
}
|
||||||
|
|
||||||
|
let product: BuildProduct = if let Some(ptype) = product_type {
|
||||||
|
products_by_subtype.remove(&ptype.to_string()).expect(&format!("Expected product type '{}' but not found in the list of products", &ptype.to_string()))
|
||||||
|
} else {
|
||||||
|
products_by_subtype.into_iter().last().expect(&format!("Expected at least one build product in job {}, found zero", job_name)).1
|
||||||
|
};
|
||||||
|
|
||||||
|
// 3. FIXME: validate sha256hash during read?
|
||||||
|
let (store_path, rel_path) = StorePathRef::from_absolute_path_full(&product.store_path).expect("Failed to parse the product's store path");
|
||||||
|
crate::nar::copy_file_in_nar(&self.config.binary_cache_uri, &nix_compat::nixbase32::encode(store_path.digest()), rel_path.to_str().unwrap(), pbar, out).await.context("while copying the NAR to the target")
|
||||||
|
}
|
||||||
|
}
|
160
src/main.rs
Normal file
160
src/main.rs
Normal file
|
@ -0,0 +1,160 @@
|
||||||
|
mod config;
|
||||||
|
mod actions;
|
||||||
|
mod hydra;
|
||||||
|
mod git;
|
||||||
|
mod nar;
|
||||||
|
|
||||||
|
use std::path::PathBuf;
|
||||||
|
|
||||||
|
use actions::{interpret_plan, Plan};
|
||||||
|
use clap::{Subcommand, Parser, Args};
|
||||||
|
use hydra::{Channel, Evaluation, HydraClient, Release};
|
||||||
|
use log::{info, trace, warn};
|
||||||
|
use object_store::{aws::{AmazonS3, AmazonS3Builder}, ObjectStore};
|
||||||
|
|
||||||
|
#[derive(Debug, Args)]
|
||||||
|
struct ChannelArgs {
|
||||||
|
/// Channel name to update
|
||||||
|
channel_name: String,
|
||||||
|
/// Job name to fetch from the Hydra instance configured
|
||||||
|
job_name: String,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug, Args)]
|
||||||
|
struct GlobalOpts {
|
||||||
|
/// TOML configuration file for channel updates
|
||||||
|
#[arg(short, long)]
|
||||||
|
config_file: PathBuf,
|
||||||
|
/// Whether to execute no remote side effects (S3 uploads, redirections), etc.
|
||||||
|
#[arg(short, long, default_value_t = false)]
|
||||||
|
dry_run: bool,
|
||||||
|
/// Whether to bypass all preflight checks
|
||||||
|
#[arg(short, long, default_value_t = false)]
|
||||||
|
bypass_preflight_checks: bool
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug, Parser)]
|
||||||
|
#[command(version, about, long_about = None)]
|
||||||
|
struct App {
|
||||||
|
#[command(flatten)]
|
||||||
|
global_opts: GlobalOpts,
|
||||||
|
#[command(subcommand)]
|
||||||
|
command: Commands
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug, Subcommand)]
|
||||||
|
enum Commands {
|
||||||
|
/// Print the plan for the given channel name and job name
|
||||||
|
Plan(ChannelArgs),
|
||||||
|
/// Apply the plan that would be generated for the given channel name and job name
|
||||||
|
Apply(ChannelArgs),
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug)]
|
||||||
|
struct PreflightCheckContext<'a> {
|
||||||
|
target_channel: &'a Channel,
|
||||||
|
new_release: &'a Release,
|
||||||
|
new_evaluation: &'a Evaluation,
|
||||||
|
current_release_path: Option<object_store::path::Path>,
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn run_preflight_checks(channel: &Channel, release: &Release, evaluation: &Evaluation, channel_bucket: AmazonS3) -> bool {
|
||||||
|
info!("Running pre-flight checks...");
|
||||||
|
let channel_name = object_store::path::Path::parse(&channel.name).expect("Channel name should be a valid S3 path");
|
||||||
|
let mut context = PreflightCheckContext {
|
||||||
|
target_channel: channel,
|
||||||
|
new_release: release,
|
||||||
|
new_evaluation: evaluation,
|
||||||
|
current_release_path: None
|
||||||
|
};
|
||||||
|
match channel_bucket.get_opts(&channel_name, object_store::GetOptions { head: true, ..Default::default() }).await {
|
||||||
|
Ok(object_store::GetResult { attributes, meta, .. }) => {
|
||||||
|
info!("Release found: {:?}", meta);
|
||||||
|
trace!("Attributes: {:#?}", attributes);
|
||||||
|
if let Some(redirection_target) = attributes.get(&object_store::Attribute::Metadata("x-amz-website-redirect-location".into())) {
|
||||||
|
context.current_release_path = Some(redirection_target.to_string().into());
|
||||||
|
}
|
||||||
|
},
|
||||||
|
Err(err) => {
|
||||||
|
warn!("Error while asking the channel bucket: {}", err);
|
||||||
|
todo!();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
trace!("Preflight context check assembled: {:?}", context);
|
||||||
|
// TODO: run anti rollback protection
|
||||||
|
true
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
#[tokio::main]
|
||||||
|
async fn main() -> std::io::Result<()> {
|
||||||
|
pretty_env_logger::init();
|
||||||
|
|
||||||
|
let args = App::parse();
|
||||||
|
|
||||||
|
let config: config::MirrorConfig = toml::from_str(&std::fs::read_to_string(args.global_opts.config_file)
|
||||||
|
.expect("Failed to read the configuration file"))
|
||||||
|
.expect("Failed to deserialize the configuration file");
|
||||||
|
|
||||||
|
println!("config: {:?}", config);
|
||||||
|
|
||||||
|
let hydra_client: HydraClient = HydraClient {
|
||||||
|
config: &config
|
||||||
|
};
|
||||||
|
|
||||||
|
match args.command {
|
||||||
|
Commands::Plan(channel) => {
|
||||||
|
let chan = Channel {
|
||||||
|
name: channel.channel_name,
|
||||||
|
};
|
||||||
|
info!("Planning for channel {} using job {}", chan.name, channel.job_name);
|
||||||
|
let release = hydra_client.fetch_release(&channel.job_name)
|
||||||
|
.await.expect("Failed to fetch release");
|
||||||
|
trace!("{:#?}", release);
|
||||||
|
let evaluation = hydra_client.fetch_evaluation(&release)
|
||||||
|
.await.expect("Failed to fetch evaluation");
|
||||||
|
trace!("{:?}", evaluation.jobset_eval_inputs);
|
||||||
|
|
||||||
|
if let hydra::Input::Git { revision, .. } = evaluation.jobset_eval_inputs.get("nixpkgs").expect("Expected a nixpkgs repository") {
|
||||||
|
info!("Release information:\n- Release is: {} (build {})\n- Eval is: {}\n- Prefix is: {}\n- Git commit is {}\n",
|
||||||
|
release.nix_name, release.id, evaluation.id, release.prefix(&chan), revision);
|
||||||
|
|
||||||
|
if args.global_opts.bypass_preflight_checks {
|
||||||
|
let actions = actions::generate_plan(&config, &chan, &release, evaluation).await;
|
||||||
|
log::debug!("Action plan: {:#?}", actions);
|
||||||
|
|
||||||
|
let staging_dir = interpret_plan(&hydra_client, Plan {
|
||||||
|
actions,
|
||||||
|
release,
|
||||||
|
channel_name: chan.name.clone(),
|
||||||
|
target_bucket: "something".to_string()
|
||||||
|
}, false).await;
|
||||||
|
info!("Plan interpreted in {}", staging_dir.path().display());
|
||||||
|
} else {
|
||||||
|
let release_bucket = config.release_bucket();
|
||||||
|
|
||||||
|
// If the release already exists, skip it.
|
||||||
|
if release_bucket.head(&release.prefix(&chan)).await.is_ok() {
|
||||||
|
log::warn!("Release already exists, skipping");
|
||||||
|
return Ok(());
|
||||||
|
}
|
||||||
|
|
||||||
|
// Runs the preflight checks.
|
||||||
|
let channel_bucket = config.channel_bucket();
|
||||||
|
if run_preflight_checks(&chan, &release, &evaluation, channel_bucket).await {
|
||||||
|
info!("Preflight checks passed");
|
||||||
|
} else {
|
||||||
|
log::error!("Preflight check failed, cannot continue, pass `--force` if you want to bypass preflight checks");
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
panic!("Nixpkgs input is not of type Git");
|
||||||
|
}
|
||||||
|
},
|
||||||
|
Commands::Apply(_) => todo!(),
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
106
src/nar.rs
Normal file
106
src/nar.rs
Normal file
|
@ -0,0 +1,106 @@
|
||||||
|
use std::borrow::{Borrow, BorrowMut};
|
||||||
|
|
||||||
|
use anyhow::{bail, Context};
|
||||||
|
use async_compression::tokio::bufread::ZstdDecoder;
|
||||||
|
use futures::{Stream, StreamExt, TryStreamExt};
|
||||||
|
use indicatif::{ProgressBar, ProgressStyle};
|
||||||
|
use log::debug;
|
||||||
|
use nix_compat::nar::listing::{Listing, ListingEntry};
|
||||||
|
use nix_compat::narinfo::NarInfo;
|
||||||
|
use tokio::io::{AsyncReadExt, AsyncWrite, AsyncWriteExt};
|
||||||
|
use tokio_util::io::StreamReader;
|
||||||
|
|
||||||
|
/// Returns a NarInfo from a HTTP binary cache.
|
||||||
|
pub async fn get_narinfo(binary_cache_uri: &str, narinfo_hash: &str) -> reqwest::Result<String> {
|
||||||
|
let client = reqwest::Client::new();
|
||||||
|
let resp = client.get(format!("{}/{}.narinfo", binary_cache_uri, narinfo_hash))
|
||||||
|
.header("User-Agent", "nixos-channel-scripts (rust)")
|
||||||
|
.send()
|
||||||
|
.await?;
|
||||||
|
|
||||||
|
resp.text().await
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Returns a listing from a HTTP binary cache.
|
||||||
|
pub async fn get_listing(binary_cache_uri: &str, narinfo_hash: &str) -> reqwest::Result<Listing> {
|
||||||
|
let client = reqwest::Client::builder().brotli(true).build().unwrap();
|
||||||
|
let resp = client.get(format!("{}/{}.ls", binary_cache_uri, narinfo_hash))
|
||||||
|
.header("User-Agent", "nixos-channel-scripts (rust)")
|
||||||
|
.send()
|
||||||
|
.await?;
|
||||||
|
|
||||||
|
resp.json().await
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Returns a `Content-Range`-aware HTTP body reader of the NAR.
|
||||||
|
pub async fn ranged_nar_reader(binary_cache_uri: &str, narinfo: NarInfo<'_>, start: u64, end: u64) -> reqwest::Result<impl Stream<Item=reqwest::Result<bytes::Bytes>>> {
|
||||||
|
let client = reqwest::Client::new();
|
||||||
|
// TODO: handle compression?
|
||||||
|
let resp = client.get(format!("{}/{}", binary_cache_uri, narinfo.url))
|
||||||
|
.header("User-Agent", "nixos-channel-scripts (rust)")
|
||||||
|
.header("Content-Range", format!("bytes {}-{}/*", start, end))
|
||||||
|
.send()
|
||||||
|
.await?;
|
||||||
|
|
||||||
|
Ok(resp.bytes_stream())
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn nar_reader(binary_cache_uri: &str, narinfo: NarInfo<'_>) -> reqwest::Result<impl Stream<Item=reqwest::Result<bytes::Bytes>>> {
|
||||||
|
let client = reqwest::Client::new();
|
||||||
|
// TODO: handle compression?
|
||||||
|
let resp = client.get(format!("{}/{}", binary_cache_uri, narinfo.url))
|
||||||
|
.header("User-Agent", "nixos-channel-scripts (rust)")
|
||||||
|
.send()
|
||||||
|
.await?;
|
||||||
|
|
||||||
|
Ok(resp.bytes_stream())
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Hits the binary cache with a narinfo request
|
||||||
|
/// and copies a specific file to the provided writer
|
||||||
|
/// asynchronously.
|
||||||
|
pub async fn copy_file_in_nar<W: AsyncWrite + Unpin>(binary_cache_uri: &str, narinfo_hash: &str, path: &str, pbar: ProgressBar, out: &mut W) -> anyhow::Result<u64> {
|
||||||
|
let narinfo_str: String = get_narinfo(binary_cache_uri, narinfo_hash).await?;
|
||||||
|
let narinfo: NarInfo = NarInfo::parse(narinfo_str.as_ref()).context("while parsing the narinfo").expect("Failed to parse narinfo from HTTP binary cache server");
|
||||||
|
let listing: Listing = get_listing(binary_cache_uri, narinfo_hash).await.context("while parsing the listing of that NAR")?;
|
||||||
|
match listing {
|
||||||
|
Listing::V1 { root, .. } => {
|
||||||
|
let entry = root.locate(path)
|
||||||
|
.expect("Invalid relative path to the NAR")
|
||||||
|
.ok_or(tokio::io::Error::new(tokio::io::ErrorKind::NotFound, format!("Entry {} not found in the NAR listing", path)))?;
|
||||||
|
|
||||||
|
if let ListingEntry::Regular {
|
||||||
|
size, nar_offset, ..
|
||||||
|
} = entry {
|
||||||
|
pbar.set_length(*size);
|
||||||
|
pbar.set_style(ProgressStyle::with_template("[{elapsed_precise}] {bar:40.cyan/blue} {bytes:>7}/{total_bytes:>7} @ {bytes_per_sec:>7} {msg}").unwrap().progress_chars("##-"));
|
||||||
|
pbar.set_message(format!("Downloading and decompressing '{}' @ '{}' to the staging directory", narinfo_hash, path));
|
||||||
|
|
||||||
|
// TODO: this is ugly.
|
||||||
|
let mut file_reader = ZstdDecoder::new(
|
||||||
|
StreamReader::new(nar_reader(binary_cache_uri, narinfo).await?.map_err(std::io::Error::other))
|
||||||
|
);
|
||||||
|
|
||||||
|
|
||||||
|
// Discard *nar_offset bytes
|
||||||
|
let discarded = tokio::io::copy(&mut file_reader.borrow_mut().take(*nar_offset), &mut pbar.wrap_async_write(tokio::io::sink())).await
|
||||||
|
.context("while discarding the start of the NAR").unwrap();
|
||||||
|
|
||||||
|
debug!("discarded {} bytes", discarded);
|
||||||
|
|
||||||
|
// Copy at most `*size` bytes now.
|
||||||
|
let copied = tokio::io::copy(&mut file_reader.take(*size), &mut pbar.wrap_async_write(out)).await
|
||||||
|
.context("while copying the file in the NAR").unwrap();
|
||||||
|
|
||||||
|
assert!(copied == *size, "mismatch in the copy sizes");
|
||||||
|
pbar.finish();
|
||||||
|
|
||||||
|
return Ok(copied)
|
||||||
|
} else {
|
||||||
|
bail!("Expected a file, obtained either a symlink or a directory");
|
||||||
|
}
|
||||||
|
|
||||||
|
},
|
||||||
|
_ => bail!("Unsupported listing version")
|
||||||
|
}
|
||||||
|
}
|
Loading…
Reference in a new issue