feat(*): init Rust port

This is a Rust port of the original Perl script, legacy cruft is removed
and it focuses on a modern Hydra deployment.

Nonetheless, it knows how to perform migrations based on the channel
versions.

Signed-off-by: Raito Bezarius <masterancpp@gmail.com>
This commit is contained in:
raito 2024-08-02 17:45:59 +02:00
parent 809d960f49
commit 414f582fc4
15 changed files with 3947 additions and 6 deletions

5
.gitignore vendored
View file

@ -1 +1,6 @@
result result
# Added by cargo
/target

2694
Cargo.lock generated Normal file

File diff suppressed because it is too large Load diff

27
Cargo.toml Normal file
View file

@ -0,0 +1,27 @@
[package]
name = "nixos-channel-scripts"
version = "0.1.0"
edition = "2021"
[dependencies]
clap = { version = "4.5.13", features = [ "derive" ] }
indicatif = { version = "0.17.8", features = ["futures", "tokio"] }
object_store = { version = "0.10.2", features = [ "aws" ] }
regex = "1.10.6"
reqwest = { version = "0.12.5", features = ["brotli"] }
serde = "1.0.204"
tempdir = "0.3.7"
textwrap = "0.16.1"
tokio = { version = "1.39.2", features = ["full"] }
toml = "0.8.19"
nix-compat = { git = "https://github.com/tvlfyi/tvix" }
futures = "0.3.30"
anyhow = "1.0.86"
bytes = "1.7.1"
async-compression = { version = "0.4.12", features = ["tokio", "zstd"] }
tokio-util = { version = "0.7.11", features = ["io"] }
termtree = "0.5.1"
walkdir = "2.5.0"
handlebars = "6.0.0"
serde_json = "1.0.127"
tracing = { version = "0.1.40", features = ["max_level_debug", "release_max_level_info"] }

View file

@ -1,3 +1,11 @@
(import (fetchTarball https://github.com/edolstra/flake-compat/archive/master.tar.gz) { { pkgs ? import <nixpkgs> {} }:
src = builtins.fetchGit ./.; {
}).defaultNix shell = pkgs.mkShell {
buildInputs = [
pkgs.cargo
pkgs.rustc
pkgs.openssl
pkgs.pkg-config
];
};
}

5
forkos.toml Normal file
View file

@ -0,0 +1,5 @@
hydra_uri = "https://hydra.forkos.org"
binary_cache_uri = "https://cache.forkos.org"
nixpkgs_dir = "/var/lib/nixpkgs"
s3_release_bucket_name = "bagel-channel-scripts-test"
s3_channel_bucket_name = "bagel-channel-scripts-test"

View file

@ -1,3 +0,0 @@
(import (fetchTarball https://github.com/edolstra/flake-compat/archive/master.tar.gz) {
src = builtins.fetchGit ./.;
}).shellNix

324
src/actions.rs Normal file
View file

@ -0,0 +1,324 @@
use std::path::PathBuf;
use anyhow::Context;
use futures::future::join_all;
use futures::TryStreamExt;
use indicatif::MultiProgress;
use indicatif::ProgressBar;
use indicatif::ProgressDrawTarget;
use indicatif::ProgressStyle;
use tracing::debug;
use object_store::aws::AmazonS3;
use object_store::ObjectStore;
use object_store::WriteMultipart;
use tempdir::TempDir;
use crate::hydra;
use crate::config;
const MULTIPART_CHUNK_SIZE: usize = 100 * 1024 * 1024; // 100 MB
#[derive(Debug, Copy, Clone)]
pub enum HydraProductType {
BrotliJson,
SourceDistribution,
}
#[derive(Debug)]
pub enum Compression {
Zstd,
Brotli,
None
}
impl Compression {
pub fn http_encoding(&self) -> Option<&str> {
Some(match self {
Self::Zstd => "zstd",
Self::Brotli => "br",
Self::None => return None,
})
}
pub fn file_extension(&self) -> Option<&str> {
Some(match self {
Self::Zstd => "zst",
Self::Brotli => "br",
Self::None => return None,
})
}
}
impl ToString for HydraProductType {
fn to_string(&self) -> String {
match self {
Self::BrotliJson => "json-br".to_string(),
Self::SourceDistribution => "source-dist".to_string(),
}
}
}
#[derive(Debug)]
pub enum Action {
WriteFile {
dst_path: PathBuf,
contents: Vec<u8>
},
WriteHydraProduct {
product_name: String,
dst_path: Option<PathBuf>,
product_type: Option<HydraProductType>,
},
WriteCompressedStorePaths {
compression: Compression
}
}
#[derive(Debug)]
pub struct Plan {
pub actions: Vec<Action>,
pub release: hydra::Release,
pub channel_name: String,
pub target_bucket: AmazonS3
}
#[derive(Debug)]
pub struct PlanOptions {
pub streaming: bool,
pub enable_progress_bar: bool
}
macro_rules! plan_write_file {
($plan_container:ident, $dst_path:literal, $contents:expr) => {
$plan_container.push(Action::WriteFile {
dst_path: $dst_path.into(),
contents: $contents
});
}
}
macro_rules! plan_write_hydra_product {
($plan_container:ident, $pname:expr) => {
$plan_container.push(Action::WriteHydraProduct {
product_name: $pname.into(),
dst_path: None,
product_type: None
});
};
($plan_container:ident, $dst_path:literal, $pname:expr) => {
$plan_container.push(Action::WriteHydraProduct {
dst_path: Some($dst_path.into()),
product_name: $pname.into(),
product_type: None
});
};
($plan_container:ident, $dst_path:literal, $pname:expr, $ptype:expr) => {
$plan_container.push(Action::WriteHydraProduct {
dst_path: Some($dst_path.into()),
product_name: $pname.into(),
product_type: Some($ptype)
});
};
}
macro_rules! plan_iso {
($plan_container:ident, $iso_name:literal, $arch:literal) => {
plan_write_hydra_product!($plan_container, format!("nixos.iso_{}.{}", $iso_name, $arch));
}
}
#[tracing::instrument]
pub async fn generate_plan(config: &config::MirrorConfig, channel: &hydra::Channel, release: &hydra::Release, evaluation: hydra::Evaluation) -> Vec<Action> {
let mut plan = vec![];
/*plan.push(Action::WriteCompressedStorePaths {
compression: Compression::Zstd
});*/
if let hydra::Input::Git { revision, .. } = evaluation.jobset_eval_inputs.get("nixpkgs").unwrap() {
plan_write_file!(plan, "src-url", release.evaluation_url(&config.hydra_uri).into());
plan_write_file!(plan, "git-revision", revision.clone().into());
plan_write_file!(plan, "binary-cache-url", config.binary_cache_uri.clone().into());
if matches!(channel.r#type(), hydra::ChannelType::NixOS) {
plan_write_hydra_product!(plan, "nixexprs.tar.xz", "nixos.channel");
plan_write_hydra_product!(plan, "packages.json.br", "nixpkgs.tarball",
HydraProductType::BrotliJson
);
plan_write_hydra_product!(plan, "options.json.br", "nixos.options",
HydraProductType::BrotliJson
);
// TODO: we don't have aarch64-linux yet
//plan_iso!(plan, "minimal", "aarch64-linux");
plan_iso!(plan, "minimal", "x86_64-linux");
if !matches!(channel.variant(), hydra::ChannelVariant::Small) {
// TODO: for some reason, those ISOs are not present in ForkOS hydra.
// TODO: implement error recovery, skipping or failure?
// maybe encode this in the plan (fallible, etc.)
// TODO: compat with nixos-20, 21, 22, 23 using plasma5?
// should we have plasma_latest?
//plan_iso!(plan, "plasma6", "aarch64-linux");
//plan_iso!(plan, "plasma6", "x86_64-linux");
//plan_iso!(plan, "gnome", "aarch64-linux");
//plan_iso!(plan, "gnome", "x86_64-linux");
// TODO: i686-linux ISO compat? for nixos ≤ 23.11
// FIXME: weird OVA appliance here, should investigate what to do
//plan_write_hydra_product!(plan, "nixos.ova.x86_64-linux");
}
} else {
plan_write_hydra_product!(plan, "nixexprs.tar.xz", "tarball",
HydraProductType::SourceDistribution);
plan_write_hydra_product!(plan, "packages.json.br", "tarball",
HydraProductType::BrotliJson);
}
}
// TODO: take into account the SQLite database & debuginfo
plan
}
#[derive(Debug)]
pub enum PlanInterpretation {
/// The plan has been streamed to the target object storage
/// It's ready to be applied by promoting atomically this temporary object store path.
Streamed(object_store::path::Path),
/// The plan has been prepared locally and can now be inspected or uploaded.
LocallyPlanned(TempDir)
}
/// Interprets the plan as a list of action.
///
/// It creates a staging directory where all actions are prepared.
/// No irreversible action is committed.
///
/// At the end, it returns the staging directory for further post-processing, including possible
/// uploads.
///
/// Enabling streaming will also directly stream the contents of the staging directory to the
/// target bucket in a temporary suffix of the target prefix.
/// Caller is responsible to promote that upload to a finalized upload.
#[tracing::instrument]
pub async fn interpret_plan(hydra_client: &hydra::HydraClient<'_>, plan: Plan, plan_opts: PlanOptions) -> PlanInterpretation {
// Prepare a staging directory
let staging_directory = TempDir::new(format!("staging-{}", &plan.channel_name).as_str()).expect("Failed to create temporary directory for staging directory");
let staging_prefix = staging_directory.path().file_name()
.expect("Failed to obtain the final component of the staging directory")
.to_str()
.expect("Failed to convert the final component into string")
.to_owned();
let streaming = plan_opts.streaming;
if streaming {
debug!("Streaming the update to /{}", staging_prefix);
} else {
debug!("Preparing locally the update in {}", staging_directory.path().display());
}
// Initialize multiple progress bars if we are to show progress bars.
let multi_pbar = MultiProgress::with_draw_target(if plan_opts.enable_progress_bar {
ProgressDrawTarget::stdout()
} else {
ProgressDrawTarget::hidden()
});
// Pending download/upload/streaming tasks.
let mut pending = Vec::new();
for action in plan.actions {
let pbar = ProgressBar::new(0);
let pbar = multi_pbar.add(pbar.clone());
pbar.set_style(ProgressStyle::with_template("[{elapsed_precise}] {bar:40.cyan/blue} {bytes:>7}/{total_bytes:>7} @ {bytes_per_sec:>7} {msg}").unwrap().progress_chars("##-"));
let task = async {
match action {
Action::WriteHydraProduct { product_name, dst_path, product_type } => {
let dst_path = dst_path.unwrap_or_else(|| product_name.clone().into());
assert!(dst_path.is_relative(), "Unexpected absolute path in a channel script plan, all paths needs to be relative in order for them to be prefixed by the staging directory");
let mut copied: u64;
if streaming {
copied = 0;
pbar.set_message(format!("Downloading and streaming '{}'", product_name));
let bucket_path = format!("{}/{}", staging_prefix, dst_path.to_str().unwrap()).try_into().unwrap();
let mpart = plan.target_bucket.put_multipart(&bucket_path).await.expect(&format!("Failed to create the multipart uploader for {}", bucket_path));
let mut writer = WriteMultipart::new_with_chunk_size(mpart, MULTIPART_CHUNK_SIZE);
let (size, mut stream) = hydra_client.hydra_product_bytes_stream(&plan.release, &product_name, product_type).await
.context("while preparing the stream for a Hydra product")
.expect("Failed to stream an Hydra product");
pbar.set_length(size);
while let Some(bytes) = stream.try_next()
.await
.context(format!("while streaming the hydra product {} ({:?}) to {}", product_name, product_type, dst_path.display()))
.expect("Failed to stream the Hydra product") {
// OK, this is not really writing them but if `bytes` is small enough
// the Put request should complete very quickly, thus… it should be
// fine?
pbar.inc(bytes.len() as u64);
copied += bytes.len() as u64;
writer.put(bytes);
}
pbar.set_message(format!("Downloaded. Streaming '{}'", product_name));
writer.finish().await.context(format!("while finishing to stream the hydra product {}", product_name)).unwrap();
pbar.finish_with_message(format!("Streamed '{}'", product_name));
} else {
pbar.set_message(format!("Copying '{}'", product_name));
let mut tgt_file = tokio::fs::File::create(staging_directory.path().join(dst_path.clone())).await.unwrap();
copied = hydra_client.copy_hydra_product(&plan.release, &product_name, product_type, pbar, &mut tgt_file).await
.context(format!("while writing the hydra product {} ({:?}) to {}", product_name, product_type, dst_path.display()))
.expect("Failed to copy an Hydra product");
}
debug!("Copied {} bytes", copied);
},
Action::WriteFile { dst_path, contents } => {
// We don't need a progress bar here.
multi_pbar.remove(&pbar);
assert!(dst_path.is_relative(), "Unexpected absolute path in a channel script plan, all paths needs to be relative in order for them to be prefixed by the staging directory");
let size = contents.len();
if streaming {
let bucket_path = format!("{}/{}", staging_prefix, dst_path.to_str().unwrap().to_owned()).try_into().unwrap();
plan.target_bucket.put(&bucket_path, contents.into()).await.context(format!("while finishing to stream a file contents")).unwrap();
} else {
tokio::fs::write(staging_directory.path().join(&dst_path), contents).await.expect("Failed to write a file");
}
debug!("Written {} bytes for {:?}", size, dst_path.file_name().unwrap());
},
Action::WriteCompressedStorePaths { compression } => {
match compression {
Compression::Zstd => todo!(),
Compression::Brotli => todo!(),
Compression::None => todo!(),
}
let spaths = hydra_client.fetch_store_paths(&plan.release).await.context("while fetching store paths").expect("Failed to fetch store paths from Hydra");
let _serialized = serde_json::to_vec(&spaths).context("while serializing store paths to JSON").expect("failed to serialize");
// TODO: Compress in-memory...
todo!();
},
};
};
pending.push(task);
}
// TODO: Write an HTML page of release with the release name.
join_all(pending).await;
if streaming {
staging_directory.close().expect("Failed to get rid of the staging directory");
PlanInterpretation::Streamed(staging_prefix.into())
} else {
PlanInterpretation::LocallyPlanned(staging_directory)
}
}

33
src/config.rs Normal file
View file

@ -0,0 +1,33 @@
use object_store::aws::{AmazonS3, AmazonS3Builder};
use serde::Deserialize;
use std::path::PathBuf;
#[derive(Debug, Deserialize)]
pub struct MirrorConfig {
/// URI to Hydra instance
pub hydra_uri: String,
/// URI to the binary cache
pub binary_cache_uri: String,
/// A path to a checkout of nixpkgs
nixpkgs_dir: PathBuf,
/// S3 releases bucket name
pub s3_release_bucket_name: String,
/// S3 channels bucket name
s3_channel_bucket_name: String,
}
impl MirrorConfig {
pub fn release_bucket(&self) -> AmazonS3 {
AmazonS3Builder::from_env()
.with_bucket_name(&self.s3_release_bucket_name)
.build()
.expect("Failed to connect to the S3 release bucket")
}
pub fn channel_bucket(&self) -> AmazonS3 {
AmazonS3Builder::from_env()
.with_bucket_name(&self.s3_channel_bucket_name)
.build()
.expect("Failed to connect to the S3 channel bucket")
}
}

1
src/git.rs Normal file
View file

@ -0,0 +1 @@

29
src/html.rs Normal file
View file

@ -0,0 +1,29 @@
use handlebars::Handlebars;
use serde::Serialize;
#[derive(Debug, Serialize)]
struct ReleaseFileData {
name: String,
size: String,
sha256: String
}
#[derive(Debug, Serialize)]
#[serde(rename_all = "camelCase")]
struct ReleasePageData {
channel_name: String,
release_name: String,
release_date: String,
git_commit_link: String,
git_revision: String,
hydra_eval_link: String,
hydra_eval_id: u64,
files: Vec<ReleaseFileData>
}
fn render_release_page(page_data: ReleasePageData) -> Result<String, handlebars::RenderError> {
let reg = Handlebars::new();
let release_template = include_str!("./release.tpl");
reg.render_template(&release_template, &page_data)
}

334
src/hydra.rs Normal file
View file

@ -0,0 +1,334 @@
use std::{collections::HashMap, path::PathBuf};
use anyhow::Context;
use bytes::Bytes;
use futures::Stream;
use indicatif::ProgressBar;
use nix_compat::store_path::{StorePath, StorePathRef};
use regex::Regex;
use serde::{de::Error, Deserialize, Deserializer};
use tokio::io::AsyncWrite;
use crate::{actions::HydraProductType, config::MirrorConfig};
pub type ReleaseId = u64;
pub type EvaluationId = u64;
pub type BuildId = u64;
fn deser_bool_as_int<'de, D>(deserializer: D) -> Result<bool, D::Error>
where D: Deserializer<'de>
{
u8::deserialize(deserializer).map(|b| if b == 1 { true } else { false })
}
fn deser_bool_as_string<'de, D>(deserializer: D) -> Result<bool, D::Error>
where D: Deserializer<'de>
{
let s: &str = Deserialize::deserialize(deserializer)?;
match s {
"false" => Ok(false),
"true" => Ok(true),
_ => Err(Error::unknown_variant(s, &["false", "true"]))
}
}
#[derive(Debug, Deserialize)]
pub struct OutputPath {
#[serde(rename="path")]
store_path: String
}
#[derive(Debug, Deserialize)]
pub struct BuildProduct {
#[serde(rename="path")]
store_path: String,
name: String,
subtype: String,
sha256hash: String,
r#type: String,
#[serde(rename="filesize")]
file_size: u64
}
#[derive(Debug, Deserialize)]
pub struct BuildInfo {
jobset: String,
job: String,
project: String,
#[serde(deserialize_with = "deser_bool_as_int", rename="buildstatus")]
finished: bool,
#[serde(rename="releasename")]
release_name: Option<String>,
#[serde(rename="stoptime")]
stop_time: u64,
#[serde(rename="starttime")]
start_time: u64,
system: String,
id: BuildId,
#[serde(rename="buildoutputs")]
build_outputs: HashMap<String, OutputPath>,
#[serde(rename="buildproducts")]
build_products: HashMap<String, BuildProduct>,
#[serde(rename="nixname")]
nix_name: String,
#[serde(rename="drvpath")]
drv_path: String,
}
#[derive(Debug)]
pub struct Channel {
pub name: String
}
#[non_exhaustive]
pub enum ChannelType {
NixOS,
Nixpkgs,
}
#[non_exhaustive]
pub enum ChannelVariant {
Normal,
Small
}
impl Channel {
pub fn version(&self) -> String {
let re = Regex::new("([a-z]+)-(?<ver>.*)").unwrap();
let caps = re.captures(&self.name).expect("Failed to parse the channel name");
caps["ver"].to_string()
}
pub fn prefix(&self) -> String {
let re = Regex::new("(?<name>[a-z]+)-(.*)").unwrap();
let caps = re.captures(&self.name).expect("Failed to parse the channel name");
caps["name"].to_string()
}
// TODO: just regex match?
pub fn r#type(&self) -> ChannelType {
ChannelType::NixOS
}
pub fn variant(&self) -> ChannelVariant {
ChannelVariant::Normal
}
}
#[derive(Deserialize, Debug)]
pub struct Release {
pub id: ReleaseId,
job: String,
#[serde(rename = "releasename")]
release_name: Option<String>,
#[serde(rename = "starttime")]
start_time: u64,
#[serde(rename = "stoptime")]
stop_time: u64,
#[serde(rename = "nixname")]
pub nix_name: String,
#[serde(rename = "jobsetevals")]
jobset_evals: Vec<EvaluationId>,
jobset: String,
#[serde(deserialize_with = "deser_bool_as_int")]
finished: bool,
priority: u64,
system: String,
timestamp: u64,
project: String,
#[serde(rename = "drvpath")]
derivation_path: String,
// ignored: buildproducts, buildoutputs, buildmetrics, buildstatus
}
#[derive(Debug)]
pub struct GitInput {
uri: String,
revision: String
}
// FIXME(Raito): for some reason, #[serde(tag = "type"), rename_all = "lowercase"] doesn't behave
// correctly, and causes deserialization failures in practice. `untagged` is suboptimal but works
// because of the way responses works...
#[derive(Debug, Deserialize)]
#[serde(untagged, expecting = "An valid jobset input")]
pub enum Input {
Boolean {
#[serde(deserialize_with = "deser_bool_as_string")]
value: bool
},
Git { uri: String, revision: String },
Nix { value: String },
}
#[derive(Deserialize, Debug)]
pub struct Evaluation {
pub id: EvaluationId,
#[serde(rename="checkouttime")]
checkout_time: u64,
#[serde(rename="evaltime")]
eval_time: u64,
flake: Option<String>,
#[serde(rename="jobsetevalinputs", default)]
pub jobset_eval_inputs: HashMap<String, Input>,
timestamp: u64,
builds: Vec<BuildId>,
}
impl Release {
pub fn version(&self) -> String {
let re = Regex::new(".+-(?<ver>[0-9].+)").unwrap();
let caps = re.captures(&self.nix_name).expect("Failed to parse the release name");
caps["ver"].to_string()
}
pub fn evaluation_url(&self, hydra_base_uri: &str) -> String {
let eval_id = self.jobset_evals.first().expect("Failed to obtain the corresponding evaluation, malformed release?");
format!("{}/eval/{}", hydra_base_uri, eval_id)
}
pub fn job_url(&self, hydra_base_uri: &str, job_name: &str) -> String {
let eval_id = self.jobset_evals.first().expect("Failed to obtain the corresponding evaluation, malformed release?");
format!("{}/eval/{}/job/{}", hydra_base_uri, eval_id, job_name)
}
pub fn store_paths_url(&self, hydra_base_uri: &str) -> String {
let eval_id = self.jobset_evals.first().expect("Failed to obtain the corresponding evaluation, malformed release?");
format!("{}/eval/{}/store-paths", hydra_base_uri, eval_id)
}
/// Directory related to this release.
fn directory(&self, channel: &Channel) -> String {
match channel.name.as_str() {
"nixpkgs-unstable" => "nixpkgs".to_string(),
_ => format!("{}/{}", channel.prefix(), channel.version())
}
}
pub fn prefix(&self, channel: &Channel) -> object_store::path::Path {
format!("{}/{}", self.directory(channel), self.nix_name).into()
}
}
pub fn release_uri(hydra_uri: &str, job_name: &str) -> String {
format!("{}/job/{}/latest", hydra_uri, job_name)
}
#[derive(Debug)]
pub struct HydraClient<'a> {
pub config: &'a MirrorConfig,
}
impl HydraClient<'_> {
pub async fn fetch_release(&self, job_name: &str) -> reqwest::Result<Release> {
let client = reqwest::Client::new();
let resp = client.get(release_uri(&self.config.hydra_uri, job_name))
.header("Accept", "application/json")
// TODO: put a proper version
.header("User-Agent", "nixos-channel-scripts (rust)")
.send()
.await?;
resp.json().await
}
pub async fn fetch_evaluation(&self, release: &Release) -> reqwest::Result<Evaluation> {
let client = reqwest::Client::new();
let resp = client.get(release.evaluation_url(&self.config.hydra_uri))
.header("Accept", "application/json")
.header("User-Agent", "nixos-channel-scripts (rust)")
.send()
.await?;
resp.json().await
}
pub async fn fetch_store_paths(&self, release: &Release) -> anyhow::Result<Vec<StorePath<String>>> {
let client = reqwest::Client::new();
Ok(client.get(release.store_paths_url(&self.config.hydra_uri))
.header("Accept", "application/json")
.header("User-Agent", "nixos-channel-scripts (rust)")
.send()
.await?
.json()
.await
.context(format!("while downloading store-paths information for an evaluation"))?
)
}
pub async fn hydra_product_bytes_stream(&self, release: &Release, job_name: &str, product_type: Option<HydraProductType>) -> anyhow::Result<(u64, impl Stream<Item = std::io::Result<Bytes>>)> {
let client = reqwest::Client::new();
let build_info: BuildInfo = client.get(release.job_url(&self.config.hydra_uri, job_name))
.header("Accept", "application/json")
.header("User-Agent", "nixos-channel-scripts (rust)")
.send()
.await?
.json()
.await
.context(format!("while downloading build information from {}", release.job_url(&self.config.hydra_uri, job_name)))?;
let mut products_by_subtype: HashMap<String, BuildProduct> = HashMap::new();
for (_, product) in build_info.build_products {
if products_by_subtype.contains_key(&product.subtype) {
todo!("Job {} has multiple products of the same subtype {:?}. This is not supported yet.", job_name, product_type);
}
products_by_subtype.insert(product.subtype.clone(), product);
}
if products_by_subtype.len() > 1 && product_type.is_none() {
panic!("Job {} has {} build products. Select the right product via the subtypes.", job_name, products_by_subtype.len());
}
let product: BuildProduct = if let Some(ptype) = product_type {
products_by_subtype.remove(&ptype.to_string()).expect(&format!("Expected product type '{}' but not found in the list of products", &ptype.to_string()))
} else {
products_by_subtype.into_iter().last().expect(&format!("Expected at least one build product in job {}, found zero", job_name)).1
};
// 3. FIXME: validate sha256hash during read?
let (store_path, rel_path) = StorePathRef::from_absolute_path_full(&product.store_path).expect("Failed to parse the product's store path");
crate::nar::file_in_nar_bytes_stream(&self.config.binary_cache_uri, &nix_compat::nixbase32::encode(store_path.digest()), rel_path.to_str().unwrap()).await.context("while copying the NAR to the target")
}
pub async fn copy_hydra_product<W: AsyncWrite + Unpin>(&self, release: &Release, job_name: &str, product_type: Option<HydraProductType>, pbar: ProgressBar, out: &mut W) -> anyhow::Result<u64> {
// TODO: dry me?
let client = reqwest::Client::new();
let build_info: BuildInfo = client.get(release.job_url(&self.config.hydra_uri, job_name))
.header("Accept", "application/json")
.header("User-Agent", "nixos-channel-scripts (rust)")
.send()
.await?
.json()
.await
.context(format!("while downloading build information from {}", release.job_url(&self.config.hydra_uri, job_name)))?;
let mut products_by_subtype: HashMap<String, BuildProduct> = HashMap::new();
for (_, product) in build_info.build_products {
if products_by_subtype.contains_key(&product.subtype) {
todo!("Job {} has multiple products of the same subtype {:?}. This is not supported yet.", job_name, product_type);
}
products_by_subtype.insert(product.subtype.clone(), product);
}
if products_by_subtype.len() > 1 && product_type.is_none() {
panic!("Job {} has {} build products. Select the right product via the subtypes.", job_name, products_by_subtype.len());
}
let product: BuildProduct = if let Some(ptype) = product_type {
products_by_subtype.remove(&ptype.to_string()).expect(&format!("Expected product type '{}' but not found in the list of products", &ptype.to_string()))
} else {
products_by_subtype.into_iter().last().expect(&format!("Expected at least one build product in job {}, found zero", job_name)).1
};
// 3. FIXME: validate sha256hash during read?
let (store_path, rel_path) = StorePathRef::from_absolute_path_full(&product.store_path).expect("Failed to parse the product's store path");
crate::nar::copy_file_in_nar(&self.config.binary_cache_uri, &nix_compat::nixbase32::encode(store_path.digest()), rel_path.to_str().unwrap(), pbar, out).await.context("while copying the NAR to the target")
}
}

223
src/main.rs Normal file
View file

@ -0,0 +1,223 @@
mod config;
mod actions;
mod hydra;
mod git;
mod nar;
mod html;
use std::path::PathBuf;
use actions::{interpret_plan, Plan, PlanInterpretation, PlanOptions};
use clap::{Subcommand, Parser, Args};
use hydra::{Channel, Evaluation, HydraClient, Release};
use tracing::{info, trace, warn};
use object_store::{aws::AmazonS3, ObjectStore};
use tempdir::TempDir;
#[derive(Debug, Args)]
struct ChannelArgs {
/// Channel name to update
channel_name: String,
/// Job name to fetch from the Hydra instance configured
job_name: String,
}
#[derive(Debug, Args)]
struct RemoteArgs {
/// Channel name to update
channel_name: String,
/// Staging prefix
staging_prefix: String
}
#[derive(Debug, Args)]
struct GlobalOpts {
/// TOML configuration file for channel updates
#[arg(short, long)]
config_file: PathBuf,
/// Whether to execute no remote side effects (S3 uploads, redirections), etc.
#[arg(short, long, default_value_t = false)]
dry_run: bool,
/// Whether to bypass all preflight checks
#[arg(short, long, default_value_t = false)]
bypass_preflight_checks: bool,
/// Enable or disable progress bar reports
#[arg(short, long, default_value_t = true)]
enable_progress_bar: bool,
/// Stream the plan instead of preparing it in a local directory
#[arg(short, long, default_value_t = true)]
stream: bool
}
#[derive(Debug, Parser)]
#[command(version, about, long_about = None)]
struct App {
#[command(flatten)]
global_opts: GlobalOpts,
#[command(subcommand)]
command: Commands
}
#[derive(Debug, Subcommand)]
enum Commands {
/// Print the plan for the given channel name and job name
Plan(ChannelArgs),
/// Apply the plan that would be generated for the given channel name and job name
Apply(ChannelArgs),
/// Apply an existing remote plan that was planned in the past
ApplyRemote(RemoteArgs),
}
#[derive(Debug)]
struct PreflightCheckContext<'a> {
target_channel: &'a Channel,
new_release: &'a Release,
new_evaluation: &'a Evaluation,
current_release_path: Option<object_store::path::Path>,
}
#[tracing::instrument]
async fn run_preflight_checks(channel: &Channel, release: &Release, evaluation: &Evaluation, channel_bucket: AmazonS3) -> bool {
info!("Running pre-flight checks...");
let channel_name = object_store::path::Path::parse(&channel.name).expect("Channel name should be a valid S3 path");
let mut context = PreflightCheckContext {
target_channel: channel,
new_release: release,
new_evaluation: evaluation,
current_release_path: None
};
match channel_bucket.get_opts(&channel_name, object_store::GetOptions { head: true, ..Default::default() }).await {
Ok(object_store::GetResult { attributes, meta, .. }) => {
info!("Release found: {:?}", meta);
trace!("Attributes: {:#?}", attributes);
if let Some(redirection_target) = attributes.get(&object_store::Attribute::Metadata("x-amz-website-redirect-location".into())) {
context.current_release_path = Some(redirection_target.to_string().into());
}
},
Err(err) => {
warn!("Error while asking the channel bucket: {}", err);
todo!();
}
}
trace!("Preflight context check assembled: {:?}", context);
// TODO: run anti rollback protection
true
}
enum PlanResult {
AlreadyExecuted,
LocallyPrepared(TempDir),
StreamedRemotely(object_store::path::Path),
}
impl From<PlanInterpretation> for PlanResult {
fn from(value: PlanInterpretation) -> Self {
match value {
PlanInterpretation::Streamed(prefix) => PlanResult::StreamedRemotely(prefix),
PlanInterpretation::LocallyPlanned(st_dir) => PlanResult::LocallyPrepared(st_dir)
}
}
}
#[tracing::instrument]
async fn plan(hydra_client: &HydraClient<'_>, channel_name: &str, job_name: String, global_opts: &GlobalOpts, config: &config::MirrorConfig) -> std::io::Result<PlanResult> {
let chan = Channel {
name: channel_name.to_string()
};
info!("Planning for channel {} using job {}", &chan.name, &job_name);
let release = hydra_client.fetch_release(&job_name)
.await.expect("Failed to fetch release");
trace!("{:#?}", release);
let evaluation = hydra_client.fetch_evaluation(&release)
.await.expect("Failed to fetch evaluation");
trace!("{:?}", evaluation.jobset_eval_inputs);
let plan_options = PlanOptions {
streaming: global_opts.stream,
enable_progress_bar: global_opts.enable_progress_bar
};
if let hydra::Input::Git { revision, .. } = evaluation.jobset_eval_inputs.get("nixpkgs").expect("Expected a nixpkgs repository") {
info!("Release information:\n- Release is: {} (build {})\n- Eval is: {}\n- Prefix is: {}\n- Git commit is {}\n",
release.nix_name, release.id, evaluation.id, release.prefix(&chan), revision);
let release_bucket = config.release_bucket();
// If the release already exists, skip it.
if release_bucket.head(&release.prefix(&chan)).await.is_ok() {
tracing::warn!("Release already exists, skipping");
return Ok(PlanResult::AlreadyExecuted)
}
let channel_bucket = config.channel_bucket();
if global_opts.bypass_preflight_checks || run_preflight_checks(&chan, &release, &evaluation, channel_bucket).await {
let actions = actions::generate_plan(&config, &chan, &release, evaluation).await;
tracing::debug!("Action plan: {:#?}", actions);
let interpretation = interpret_plan(&hydra_client, Plan {
actions,
release,
channel_name: chan.name.clone(),
target_bucket: config.release_bucket(),
}, plan_options).await;
info!("Plan interpreted: {:?}", interpretation);
Ok(interpretation.into())
} else {
tracing::error!("Preflight check failed, cannot continue, pass `--force` if you want to bypass preflight checks");
Err(std::io::Error::other("preflight check failure"))
}
} else {
panic!("Nixpkgs input is not of type Git");
}
}
#[tokio::main]
async fn main() -> std::io::Result<()> {
let args = App::parse();
let config: config::MirrorConfig = toml::from_str(&std::fs::read_to_string(&args.global_opts.config_file)
.expect("Failed to read the configuration file"))
.expect("Failed to deserialize the configuration file");
println!("config: {:?}", config);
let hydra_client: HydraClient = HydraClient {
config: &config
};
match args.command {
Commands::Plan(channel) => {
plan(&hydra_client, &channel.channel_name, channel.job_name, &args.global_opts, &config).await?;
},
Commands::Apply(channel) => {
let plan = plan(&hydra_client, &channel.channel_name, channel.job_name, &args.global_opts, &config).await?;
match plan {
PlanResult::AlreadyExecuted => {
info!("Plan was already executed, no further action required");
},
PlanResult::LocallyPrepared(s) => todo!("Uploading a plan is a todo; use streaming!"),
PlanResult::StreamedRemotely(remote_path) => {
info!("Plan was streamed remotely successfully to '{}@{}'", config.s3_release_bucket_name, remote_path);
config.release_bucket().rename(&remote_path, &channel.channel_name.try_into().unwrap())
.await.expect("Failed to atomically promote the streamed channel, aborting");
// Perform atomic rename of the remote path
// into the target path.
}
}
},
Commands::ApplyRemote(remote) => {
let release = config.release_bucket();
release.rename(&remote.staging_prefix.try_into().unwrap(), &remote.channel_name.try_into().unwrap())
.await.expect("Failed to atomically rename the staging prefix into the channel name");
}
}
Ok(())
}

117
src/nar.rs Normal file
View file

@ -0,0 +1,117 @@
use std::borrow::{Borrow, BorrowMut};
use anyhow::{bail, Context};
use async_compression::tokio::bufread::ZstdDecoder;
use bytes::Bytes;
use futures::{Stream, TryStreamExt};
use indicatif::ProgressBar;
use tracing::debug;
use nix_compat::nar::listing::{Listing, ListingEntry};
use nix_compat::narinfo::NarInfo;
use tokio::io::{AsyncReadExt, AsyncWrite};
use tokio_util::io::{ReaderStream, StreamReader};
/// Returns a NarInfo from a HTTP binary cache.
#[tracing::instrument]
pub async fn get_narinfo(binary_cache_uri: &str, narinfo_hash: &str) -> reqwest::Result<String> {
let client = reqwest::Client::new();
let resp = client.get(format!("{}/{}.narinfo", binary_cache_uri, narinfo_hash))
.header("User-Agent", "nixos-channel-scripts (rust)")
.send()
.await?;
resp.text().await
}
/// Returns a listing from a HTTP binary cache.
#[tracing::instrument]
pub async fn get_listing(binary_cache_uri: &str, narinfo_hash: &str) -> reqwest::Result<Listing> {
let client = reqwest::Client::builder().brotli(true).build().unwrap();
let resp = client.get(format!("{}/{}.ls", binary_cache_uri, narinfo_hash))
.header("User-Agent", "nixos-channel-scripts (rust)")
.send()
.await?;
resp.json().await
}
/// Returns a `Content-Range`-aware HTTP body reader of the NAR.
pub async fn ranged_nar_reader(binary_cache_uri: &str, narinfo: NarInfo<'_>, start: u64, end: u64) -> reqwest::Result<impl Stream<Item=reqwest::Result<bytes::Bytes>>> {
let client = reqwest::Client::new();
// TODO: handle compression?
let resp = client.get(format!("{}/{}", binary_cache_uri, narinfo.url))
.header("User-Agent", "nixos-channel-scripts (rust)")
.header("Content-Range", format!("bytes {}-{}/*", start, end))
.send()
.await?;
Ok(resp.bytes_stream())
}
#[tracing::instrument]
pub async fn nar_reader(binary_cache_uri: &str, narinfo: NarInfo<'_>) -> reqwest::Result<impl Stream<Item=reqwest::Result<bytes::Bytes>>> {
let client = reqwest::Client::new();
// TODO: handle compression?
let resp = client.get(format!("{}/{}", binary_cache_uri, narinfo.url))
.header("User-Agent", "nixos-channel-scripts (rust)")
.send()
.await?;
Ok(resp.bytes_stream())
}
/// Streams a binary cache file contained in a NAR
/// by using narinfo + listing information.
#[tracing::instrument]
pub async fn file_in_nar_bytes_stream(binary_cache_uri: &str, narinfo_hash: &str, path: &str) -> anyhow::Result<(u64, impl Stream<Item = std::io::Result<Bytes>>)> {
let narinfo_str: String = get_narinfo(binary_cache_uri, narinfo_hash).await?;
let narinfo: NarInfo = NarInfo::parse(narinfo_str.as_ref()).context("while parsing the narinfo").expect("Failed to parse narinfo from HTTP binary cache server");
let listing: Listing = get_listing(binary_cache_uri, narinfo_hash).await.context("while parsing the listing of that NAR")?;
match listing {
Listing::V1 { root, .. } => {
let entry = root.locate(path)
.expect("Invalid relative path to the NAR")
.ok_or(tokio::io::Error::new(tokio::io::ErrorKind::NotFound, format!("Entry {} not found in the NAR listing", path)))?;
if let ListingEntry::Regular {
size, nar_offset, ..
} = entry {
// TODO: this is ugly.
let mut file_reader = ZstdDecoder::new(
StreamReader::new(nar_reader(binary_cache_uri, narinfo).await?.map_err(std::io::Error::other))
);
// Discard *nar_offset bytes
let discarded = tokio::io::copy(&mut file_reader.borrow_mut().take(*nar_offset), &mut tokio::io::sink()).await
.context("while discarding the start of the NAR").unwrap();
debug!("discarded {} bytes and taking at most {} bytes", discarded, *size);
// Let the consumer copy at most *size bytes of data.
Ok((*size, ReaderStream::new(file_reader.take(*size))))
} else {
bail!("Expected a file, obtained either a symlink or a directory");
}
},
_ => bail!("Unsupported listing version")
}
}
/// Hits the binary cache with a narinfo request
/// and copies a specific file to the provided writer
/// asynchronously.
pub async fn copy_file_in_nar<W: AsyncWrite + Unpin>(binary_cache_uri: &str, narinfo_hash: &str, path: &str, pbar: ProgressBar, out: &mut W) -> anyhow::Result<u64> {
let (size, stream) = file_in_nar_bytes_stream(binary_cache_uri, narinfo_hash, path).await?;
pbar.set_length(size);
let copied = tokio::io::copy(
&mut StreamReader::new(stream),
&mut pbar.wrap_async_write(out)
).await.context("while copying the file in the NAR").unwrap();
assert!(copied == size, "mismatch in the copy sizes");
pbar.finish();
return Ok(copied)
}

72
src/release.html Normal file
View file

@ -0,0 +1,72 @@
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<title>{{channelName}} - NixOS Channel</title>
<style>
body {
font-family: Arial, sans-serif;
margin: 20px;
}
.header {
margin-bottom: 20px;
}
.header h1 {
margin: 0;
font-size: 1.5em;
}
.header p {
margin: 5px 0;
color: #666;
}
.links a {
color: #0066cc;
text-decoration: none;
}
.file-list {
width: 100%;
border-collapse: collapse;
margin-top: 20px;
}
.file-list th, .file-list td {
border: 1px solid #ddd;
padding: 8px;
text-align: left;
}
.file-list th {
background-color: #f2f2f2;
}
</style>
</head>
<body>
<div class="header">
<h1>Channel: {{channelName}}</h1>
<p>Release: {{releaseName}}</p>
<p>Date of Release: {{releaseDate}}</p>
<div class="links">
<a href="{{gitCommitLink}}">Git Commit</a> |
<a href="{{hydraEvalLink}}">Hydra Evaluation</a>
</div>
</div>
<table class="file-list">
<thead>
<tr>
<th>File Name</th>
<th>Size</th>
<th>SHA-256 Hash</th>
</tr>
</thead>
<tbody>
{{#each files}}
<tr>
<td>{{name}}</td>
<td>{{size}}</td>
<td>{{sha256}}</td>
</tr>
{{/each}}
</tbody>
</table>
</body>
</html>

72
src/release.tpl Normal file
View file

@ -0,0 +1,72 @@
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<title>{{channelName}} - NixOS Channel</title>
<style>
body {
font-family: Arial, sans-serif;
margin: 20px;
}
.header {
margin-bottom: 20px;
}
.header h1 {
margin: 0;
font-size: 1.5em;
}
.header p {
margin: 5px 0;
color: #666;
}
.links a {
color: #0066cc;
text-decoration: none;
}
.file-list {
width: 100%;
border-collapse: collapse;
margin-top: 20px;
}
.file-list th, .file-list td {
border: 1px solid #ddd;
padding: 8px;
text-align: left;
}
.file-list th {
background-color: #f2f2f2;
}
</style>
</head>
<body>
<div class="header">
<h1>Channel: {{channelName}}</h1>
<p>Release: {{releaseName}}</p>
<p>Date of Release: {{releaseDate}}</p>
<div class="links">
<a href="{{gitCommitLink}}">Git Commit</a> |
<a href="{{hydraEvalLink}}">Hydra Evaluation</a>
</div>
</div>
<table class="file-list">
<thead>
<tr>
<th>File Name</th>
<th>Size</th>
<th>SHA-256 Hash</th>
</tr>
</thead>
<tbody>
{{#each files}}
<tr>
<td>{{name}}</td>
<td>{{size}}</td>
<td>{{sha256}}</td>
</tr>
{{/each}}
</tbody>
</table>
</body>
</html>