Compare commits

...

9 commits
main ... main

Author SHA1 Message Date
6e4ae567a3 chore: fix tvix hashes
Signed-off-by: Raito Bezarius <masterancpp@gmail.com>
2024-12-14 18:32:05 +01:00
a4443d8eac chore: bump locks
Signed-off-by: Raito Bezarius <masterancpp@gmail.com>
2024-12-14 18:29:22 +01:00
a08351a9d4 feat(operations): add support for git pushing the channel
This time, we have almost a complete mechanism!

Signed-off-by: Raito Bezarius <masterancpp@gmail.com>
2024-12-14 18:10:43 +01:00
a9ee947914 feat(streaming): add a command to cleanup failed streaming prefixes
And we also tuck them now in temp/ rather than in the top-level
directories.

Which makes it easier to clean them up.

Signed-off-by: Raito Bezarius <masterancpp@gmail.com>
2024-12-14 17:45:39 +01:00
23b6c38ed7 feat(script): enable x86_64-linux minimal ISO
Signed-off-by: Raito Bezarius <masterancpp@gmail.com>
2024-08-31 20:13:36 +02:00
6c79be63e7 Switch to HTTP tracing, get rid of openssl 2024-08-31 20:51:44 +03:00
3934bcbb39 fix(s3): perform the painful rename manually
god, s3 semantics are.

Signed-off-by: Raito Bezarius <masterancpp@gmail.com>
2024-08-31 19:48:22 +02:00
cb5a2a2b07 Merge pull request 'feat(*): init Rust port' (#1) from rust into main
Reviewed-on: the-distro/channel-scripts#1
2024-08-31 17:30:29 +00:00
542a3ae836 feat(*): init Rust port
This is a Rust port of the original Perl script, legacy cruft is removed
and it focuses on a modern Hydra deployment.

Nonetheless, it knows how to perform migrations based on the channel
versions.

Signed-off-by: Raito Bezarius <masterancpp@gmail.com>
2024-08-31 18:06:46 +02:00
18 changed files with 17029 additions and 71 deletions

5
.gitignore vendored
View file

@ -1 +1,6 @@
result
# Added by cargo
/target

3261
Cargo.lock generated Normal file

File diff suppressed because it is too large Load diff

11944
Cargo.nix Normal file

File diff suppressed because it is too large Load diff

40
Cargo.toml Normal file
View file

@ -0,0 +1,40 @@
[package]
name = "mirror-forkos"
version = "0.1.0"
edition = "2021"
[dependencies]
clap = { version = "4.5.13", features = [ "derive" ] }
indicatif = { version = "0.17.8", features = ["futures", "tokio"] }
object_store = { version = "0.10.2", features = [ "aws" ] }
regex = "1.10.6"
reqwest = { version = "0.12.5", default-features = false, features = ["brotli", "http2", "rustls-tls-native-roots"] }
serde = "1.0.204"
tempdir = "0.3.7"
textwrap = "0.16.1"
tokio = { version = "1.39.2", features = ["full"] }
toml = "0.8.19"
nix-compat = { git = "https://github.com/tvlfyi/tvix" }
futures = "0.3.30"
anyhow = "1.0.86"
bytes = "1.7.1"
tokio-util = { version = "0.7.11", features = ["io"] }
termtree = "0.5.1"
walkdir = "2.5.0"
handlebars = "6.0.0"
serde_json = "1.0.127"
tracing = { version = "0.1.40", features = ["max_level_debug", "release_max_level_info"] }
zstd = "0.13.2"
tokio-stream = "0.1.15"
async-compression = { version = "0.4.12", features = ["tokio", "zstd"] }
digest = "0.10.7"
blake3 = { version = "1.5.4", features = ["traits-preview"] }
sha2 = "0.10.8"
hex = "0.4.3"
chrono = "0.4.38"
tracing-subscriber = { version = "0.3.18", features = ["env-filter"] }
opentelemetry = "0.24.0"
opentelemetry_sdk = { version = "0.24.1", features = ["rt-tokio", "trace"] }
tracing-opentelemetry = { version = "0.25.0", features = ["metrics"] }
opentelemetry-otlp = { version = "0.17.0", features = ["http-json", "reqwest-client", "reqwest-rustls"] }
thiserror = "2.0.7"

View file

@ -1,3 +1,11 @@
(import (fetchTarball https://github.com/edolstra/flake-compat/archive/master.tar.gz) {
src = builtins.fetchGit ./.;
}).defaultNix
{ pkgs ? import <nixpkgs> {} }:
{
shell = pkgs.mkShell {
buildInputs = [
pkgs.cargo
pkgs.rustc
pkgs.openssl
pkgs.pkg-config
];
};
}

View file

@ -2,16 +2,16 @@
"nodes": {
"nixpkgs": {
"locked": {
"lastModified": 1717667611,
"narHash": "sha256-MA50QcNTCV3AUc+0RtyhHcYMAZ81ZkVypm/lHUq5BP0=",
"lastModified": 1734143535,
"narHash": "sha256-YVchPYuRpCFWqx6EVA1V1CY0NCTI1d3fADjOlB6oYe0=",
"owner": "NixOS",
"repo": "nixpkgs",
"rev": "d1930fa7524da1d320d00eecba84e81810477f3a",
"rev": "6160d771fb09b838abefba72df27c0c32699fe45",
"type": "github"
},
"original": {
"owner": "NixOS",
"ref": "nixos-24.05-small",
"ref": "nixos-unstable-small",
"repo": "nixpkgs",
"type": "github"
}

View file

@ -1,74 +1,17 @@
{
description = "Script for generating Nixpkgs/NixOS channels";
description = "Channel release management tools";
inputs.nixpkgs.url = "github:NixOS/nixpkgs/nixos-24.05-small";
inputs.nixpkgs.url = "github:NixOS/nixpkgs/nixos-unstable-small";
outputs = { self, nixpkgs }:
{
overlays.default = final: prev: {
nixos-channel-native-programs = with final; stdenv.mkDerivation {
name = "nixos-channel-native-programs";
buildInputs = [
nix
pkg-config
boehmgc
nlohmann_json
boost
sqlite
];
buildCommand = let
nixHasSignalsHh = nixpkgs.lib.strings.versionAtLeast nix.version "2.19";
in ''
mkdir -p $out/bin
g++ -Os -g ${./index-debuginfo.cc} -Wall -std=c++14 -o $out/bin/index-debuginfo -I . \
$(pkg-config --cflags nix-main) \
$(pkg-config --libs nix-main) \
$(pkg-config --libs nix-store) \
-lsqlite3 \
${nixpkgs.lib.optionalString nixHasSignalsHh "-DHAS_SIGNALS_HH"}
'';
};
nixos-channel-scripts = with final; stdenv.mkDerivation {
name = "nixos-channel-scripts";
buildInputs = with perlPackages;
[ nix
sqlite
makeWrapper
perl
FileSlurp
LWP
LWPProtocolHttps
ListMoreUtils
DBDSQLite
NetAmazonS3
brotli
jq
nixos-channel-native-programs
nix-index
];
buildCommand = ''
mkdir -p $out/bin
cp ${./mirror-nixos-branch.pl} $out/bin/mirror-nixos-branch
wrapProgram $out/bin/mirror-nixos-branch \
--set PERL5LIB $PERL5LIB \
--set XZ_OPT "-T0" \
--prefix PATH : ${lib.makeBinPath [ wget git nix gnutar xz rsync openssh nix-index nixos-channel-native-programs ]}
patchShebangs $out/bin
'';
};
mirror-forkos = (final.callPackage ./Cargo.nix { }).rootCrate.build;
};
defaultPackage.x86_64-linux = (import nixpkgs {
system = "x86_64-linux";
overlays = [ self.overlays.default ];
}).nixos-channel-scripts;
}).mirror-forkos;
};
}

6
forkos.toml Normal file
View file

@ -0,0 +1,6 @@
hydra_uri = "https://hydra.forkos.org"
binary_cache_uri = "https://cache.forkos.org"
nixpkgs_dir = "/var/lib/nixpkgs"
s3_release_bucket_name = "bagel-channel-scripts-test"
s3_channel_bucket_name = "bagel-channel-scripts-test"
base_git_uri_for_revision = "https://cl.forkos.org/plugins/gitiles/nixpkgs/+"

View file

@ -1,3 +0,0 @@
(import (fetchTarball https://github.com/edolstra/flake-compat/archive/master.tar.gz) {
src = builtins.fetchGit ./.;
}).shellNix

612
src/actions.rs Normal file
View file

@ -0,0 +1,612 @@
use std::collections::HashMap;
use std::path::PathBuf;
use std::str::FromStr;
use anyhow::Context;
use bytes::Bytes;
use digest::DynDigest;
use futures::future::join_all;
use futures::AsyncReadExt;
use sha2::Digest;
use tokio::io::{AsyncWrite, AsyncWriteExt};
use futures::Stream;
use futures::TryStreamExt;
use indicatif::MultiProgress;
use indicatif::ProgressBar;
use indicatif::ProgressDrawTarget;
use indicatif::ProgressStyle;
use tracing::debug;
use object_store::aws::AmazonS3;
use object_store::WriteMultipart;
use tempdir::TempDir;
use tracing::trace;
use crate::html::ReleaseFileData;
use crate::html::ReleasePageData;
use crate::hydra;
use crate::config;
const MULTIPART_CHUNK_SIZE: usize = 10 * 1024 * 1024; // 10 Mb
const ZSTD_COMPRESSION_LEVEL: i32 = 11; // Reasonable compression speed w.r.t. to the size of
// store-paths.
#[derive(Debug, Copy, Clone, Hash, Eq, PartialEq, clap::ValueEnum)]
pub enum Checksum {
Sha256,
Blake3
}
impl FromStr for Checksum {
type Err = anyhow::Error;
fn from_str(s: &str) -> Result<Self, Self::Err> {
Ok(match s {
"sha256" => Self::Sha256,
"blake3" => Self::Blake3,
_ => anyhow::bail!("unsupported checksum")
})
}
}
impl Checksum {
pub fn public_name(&self) -> &'static str {
match self {
Self::Sha256 => "SHA-256",
Self::Blake3 => "BLAKE-3"
}
}
fn into_dyn_digest(&self) -> Box<dyn DynDigest> {
match self {
Self::Sha256 => Box::new(sha2::Sha256::new()),
Self::Blake3 => Box::new(blake3::Hasher::new())
}
}
fn extension(&self) -> &'static str {
match self {
Self::Sha256 => "sha256",
Self::Blake3 => "b3"
}
}
}
#[derive(Debug, Copy, Clone)]
pub enum HydraProductType {
BrotliJson,
SourceDistribution,
}
#[derive(Debug)]
pub enum Compression {
Zstd,
None
}
impl Compression {
pub fn http_encoding(&self) -> Option<&str> {
Some(match self {
Self::Zstd => "zstd",
Self::None => return None,
})
}
pub fn file_extension(&self) -> Option<&str> {
Some(match self {
Self::Zstd => "zst",
Self::None => return None,
})
}
}
impl ToString for HydraProductType {
fn to_string(&self) -> String {
match self {
Self::BrotliJson => "json-br".to_string(),
Self::SourceDistribution => "source-dist".to_string(),
}
}
}
#[derive(Debug)]
pub enum Action {
WriteFile {
dst_path: PathBuf,
contents: Vec<u8>
},
WriteHydraProduct {
product_name: String,
dst_path: Option<PathBuf>,
product_type: Option<HydraProductType>,
},
WriteCompressedStorePaths {
compression: Compression
}
}
#[derive(Debug)]
pub struct Plan {
pub actions: Vec<Action>,
pub release: hydra::Release,
pub channel: hydra::Channel,
pub target_bucket: AmazonS3,
pub revision: String
}
#[derive(Debug)]
pub struct PlanOptions {
pub streaming: bool,
pub enable_progress_bar: bool,
pub requested_checksums: Vec<Checksum>
}
macro_rules! plan_write_file {
($plan_container:ident, $dst_path:literal, $contents:expr) => {
$plan_container.push(Action::WriteFile {
dst_path: $dst_path.into(),
contents: $contents
});
}
}
macro_rules! plan_write_hydra_product {
($plan_container:ident, $pname:expr) => {
$plan_container.push(Action::WriteHydraProduct {
product_name: $pname.into(),
dst_path: None,
product_type: None
});
};
($plan_container:ident, $dst_path:literal, $pname:expr) => {
$plan_container.push(Action::WriteHydraProduct {
dst_path: Some($dst_path.into()),
product_name: $pname.into(),
product_type: None
});
};
($plan_container:ident, $dst_path:literal, $pname:expr, $ptype:expr) => {
$plan_container.push(Action::WriteHydraProduct {
dst_path: Some($dst_path.into()),
product_name: $pname.into(),
product_type: Some($ptype)
});
};
}
macro_rules! plan_iso {
($plan_container:ident, $iso_name:literal, $arch:literal) => {
plan_write_hydra_product!($plan_container, format!("nixos.iso_{}.{}", $iso_name, $arch));
}
}
#[tracing::instrument]
pub async fn generate_plan(config: &config::MirrorConfig, channel: &hydra::Channel, release: &hydra::Release, evaluation: &hydra::Evaluation) -> Vec<Action> {
let mut plan = vec![];
plan.push(Action::WriteCompressedStorePaths {
compression: Compression::Zstd
});
if let hydra::Input::Git { revision, .. } = evaluation.jobset_eval_inputs.get("nixpkgs").unwrap() {
plan_write_file!(plan, "src-url", release.evaluation_url(&config.hydra_uri).into());
plan_write_file!(plan, "git-revision", revision.clone().into());
plan_write_file!(plan, "binary-cache-url", config.binary_cache_uri.clone().into());
if matches!(channel.r#type(), hydra::ChannelType::NixOS) {
plan_write_hydra_product!(plan, "nixexprs.tar.xz", "nixos.channel");
plan_write_hydra_product!(plan, "packages.json.br", "nixpkgs.tarball",
HydraProductType::BrotliJson
);
plan_write_hydra_product!(plan, "options.json.br", "nixos.options",
HydraProductType::BrotliJson
);
// TODO: we don't have aarch64-linux yet
//plan_iso!(plan, "minimal", "aarch64-linux");
plan_iso!(plan, "minimal", "x86_64-linux");
if !matches!(channel.variant(), hydra::ChannelVariant::Small) {
// TODO: for some reason, those ISOs are not present in ForkOS hydra.
// TODO: implement error recovery, skipping or failure?
// maybe encode this in the plan (fallible, etc.)
// TODO: compat with nixos-20, 21, 22, 23 using plasma5?
// should we have plasma_latest?
//plan_iso!(plan, "plasma6", "aarch64-linux");
//plan_iso!(plan, "plasma6", "x86_64-linux");
//plan_iso!(plan, "gnome", "aarch64-linux");
//plan_iso!(plan, "gnome", "x86_64-linux");
// TODO: i686-linux ISO compat? for nixos ≤ 23.11
// FIXME: weird OVA appliance here, should investigate what to do
//plan_write_hydra_product!(plan, "nixos.ova.x86_64-linux");
}
} else {
plan_write_hydra_product!(plan, "nixexprs.tar.xz", "tarball",
HydraProductType::SourceDistribution);
plan_write_hydra_product!(plan, "packages.json.br", "tarball",
HydraProductType::BrotliJson);
}
}
// TODO: take into account the SQLite database & debuginfo
plan
}
#[derive(Debug)]
pub enum PlanInterpretation {
/// The plan has been streamed to the target object storage
/// It's ready to be applied by promoting atomically this temporary object store path.
Streamed {
remote_path: object_store::path::Path,
target_path: object_store::path::Path
},
/// The plan has been prepared locally and can now be inspected or uploaded.
LocallyPlanned {
local_path: TempDir,
target_path: object_store::path::Path
}
}
pub async fn stream_to_multipart_writer<S, F>(mut input_stream: S, mut stream_callback: F, mut writer: WriteMultipart) -> std::io::Result<usize> where
S: Stream<Item = std::io::Result<Bytes>> + Unpin,
F: FnMut(&Bytes),
{
let mut streamed = 0;
while let Some(bytes) = input_stream.try_next().await? {
streamed += bytes.len();
stream_callback(&bytes);
writer.put(bytes);
}
writer.finish().await?;
Ok(streamed)
}
pub async fn stream_to_async_writer<S, F, W>(mut input_stream: S, _size_hint: Option<usize>, mut stream_callback: F, mut target_file: W) -> std::io::Result<usize> where
S: Stream<Item = std::io::Result<Bytes>> + Unpin,
F: FnMut(&Bytes),
W: AsyncWrite + Unpin
{
let mut streamed = 0;
while let Some(bytes) = input_stream.try_next().await? {
streamed += bytes.len();
target_file.write_all(&bytes).await?;
stream_callback(&bytes);
}
Ok(streamed)
}
pub async fn stream_to_bucket<S, F, O>(input_stream: S, size_hint: Option<usize>, target_path: &object_store::path::Path, mut stream_callback: F, bucket: &O) -> std::io::Result<usize> where
S: Stream<Item = std::io::Result<Bytes>> + Unpin,
F: FnMut(&Bytes),
O: object_store::ObjectStore
{
let uses_multipart = if let Some(size) = size_hint {
size >= MULTIPART_CHUNK_SIZE
} else { true }; // Worst case is that the stream is bigger than memory.
if uses_multipart {
// TODO: propagate error
let mpart = bucket.put_multipart(target_path).await.unwrap();
let writer = WriteMultipart::new_with_chunk_size(mpart, MULTIPART_CHUNK_SIZE);
stream_to_multipart_writer(input_stream, stream_callback, writer).await
} else {
let mut buffer = if let Some(size) = size_hint {
Vec::with_capacity(size)
} else {
// Constantify
Vec::with_capacity(8192)
};
// TODO: find a way to propagate the progress bar here.
// call the stream callback otherwise we cannot hash entire files.
input_stream.into_async_read().read_to_end(&mut buffer).await?;
let actual_length = buffer.len();
// TODO: generalize the stream callback argument
// it's not that bad because it's usually very small here.
stream_callback(&Bytes::copy_from_slice(&buffer));
bucket.put(target_path, buffer.into()).await?;
Ok(actual_length)
}
}
pub async fn stream_to_target_nock<S, F, O>(input_stream: S,
size_hint: Option<usize>,
key_prefix: &str, key: &str,
stream_callback: F,
bucket: &O, streaming: bool) -> std::io::Result<usize> where
S: Stream<Item = std::io::Result<Bytes>> + Unpin,
F: FnMut(&Bytes),
O: object_store::ObjectStore {
if streaming {
let bucket_path: object_store::path::Path = format!("{}/{}", key_prefix, key).into();
stream_to_bucket(input_stream, size_hint, &bucket_path, stream_callback, bucket).await
} else {
let fs_path: PathBuf = PathBuf::from(key_prefix).join(key);
let tgt_file = tokio::fs::File::create(&fs_path).await?;
stream_to_async_writer(input_stream, size_hint, stream_callback, tgt_file).await
}
}
pub async fn stream_to_target<S, F, O>(input_stream: S,
size_hint: Option<usize>,
key_prefix: &str, key: &str,
stream_callback: F,
bucket: &O, streaming: bool,
requested_checksums: Vec<Checksum>) -> std::io::Result<(usize, HashMap<Checksum, Box<[u8]>>)> where
S: Stream<Item = std::io::Result<Bytes>> + Unpin,
F: Fn(&Bytes),
O: object_store::ObjectStore {
let mut hashers: HashMap<Checksum, Box<dyn DynDigest>> = HashMap::from_iter(
requested_checksums.into_iter().map(|checksum| (checksum, checksum.into_dyn_digest()))
);
let copied = stream_to_target_nock(input_stream, size_hint, key_prefix, key, |bytes| {
hashers.iter_mut().for_each(|(_, hasher)| {
hasher.update(&bytes);
});
stream_callback(&bytes);
}, bucket, streaming).await?;
// Stream all the checksums as well.
let keys: HashMap<Checksum, String> = hashers.iter().map(|(ck, _)| (ck.clone(), format!("{}.{}", key, ck.extension()))).collect();
let checksums: HashMap<Checksum, Box<[u8]>> = hashers.into_iter().map(|(ck, hasher)| (ck, hasher.finalize())).collect();
join_all(checksums.iter().map(|(ck, digest)| {
let key = keys.get(ck).unwrap();
let hexdigest = hex::encode(digest);
let ck_stream = tokio_stream::once(
Ok(Bytes::from_iter(hexdigest.into_bytes().into_iter()))
);
stream_to_target_nock(ck_stream,
Some(digest.as_ref().len()),
key_prefix,
key,
|bytes| {
trace!("streaming the checksum ({:?}); chunk size = {}", ck.clone(), bytes.len());
}, bucket, streaming)
})).await;
Ok((copied, checksums))
}
/// Interprets the plan as a list of action.
///
/// It creates a staging directory where all actions are prepared.
/// No irreversible action is committed.
///
/// At the end, it returns the staging directory for further post-processing, including possible
/// uploads.
///
/// Enabling streaming will also directly stream the contents of the staging directory to the
/// target bucket in a temporary suffix of the target prefix.
/// Caller is responsible to promote that upload to a finalized upload.
#[tracing::instrument]
pub async fn interpret_plan(hydra_client: &hydra::HydraClient<'_>, plan: Plan, plan_opts: PlanOptions) -> PlanInterpretation {
// Prepare a staging directory
let staging_directory = TempDir::new(format!("staging-{}", &plan.channel.name).as_str()).expect("Failed to create temporary directory for staging directory");
let staging_prefix = format!("temp/{}", staging_directory.path().file_name()
.expect("Failed to obtain the final component of the staging directory")
.to_str()
.expect("Failed to convert the final component into string")
.to_owned());
let streaming = plan_opts.streaming;
let key_prefix = if streaming {
staging_prefix.to_string()
} else {
staging_directory.path().to_str().expect("Staging directory path is not valid UTF-8").to_owned()
};
if streaming {
debug!("Streaming the update to /{}", staging_prefix);
} else {
debug!("Preparing locally the update in {}", staging_directory.path().display());
}
// Initialize multiple progress bars if we are to show progress bars.
let multi_pbar = MultiProgress::with_draw_target(if plan_opts.enable_progress_bar {
ProgressDrawTarget::stdout()
} else {
ProgressDrawTarget::hidden()
});
// Pending download/upload/streaming tasks.
let mut pending = Vec::new();
for action in plan.actions {
let task = async {
let pbar = ProgressBar::new(0);
multi_pbar.add(pbar.clone());
pbar.set_style(ProgressStyle::with_template("[{elapsed_precise}] {bar:40.cyan/blue} {bytes:>7}/{total_bytes:>7} @ {bytes_per_sec:>7} {msg}").unwrap().progress_chars("##-"));
let requested_checksums = plan_opts.requested_checksums.clone();
match action {
Action::WriteHydraProduct { product_name, dst_path, product_type } => {
let dst_path = dst_path.unwrap_or_else(|| product_name.clone().into());
assert!(dst_path.is_relative(), "Unexpected absolute path in a channel script plan, all paths needs to be relative in order for them to be prefixed by the staging directory");
let key = dst_path.to_str().expect("Failed to convert relative destination path to a UTF-8 key");
let (size, stream) = hydra_client.hydra_product_bytes_stream(&plan.release, &product_name, product_type).await
.context("while preparing the stream for a Hydra product")
.expect("Failed to stream an Hydra product");
pbar.set_length(size);
if streaming {
pbar.set_message(format!("Downloading and streaming '{}'", product_name));
} else {
pbar.set_message(format!("Copying '{}'", product_name));
}
let (copied, checksums) = stream_to_target(stream, Some(size as usize), &key_prefix, key, |bytes| {
// This is not really called when the multipart
// has completed pushing this part.
// In practice, if we are pushing as soon as we read,
// we expect the bytes chunk to be small
// and therefore, this is fairly reasonable.
trace!("multipart streaming: chunk size = {}", bytes.len());
pbar.inc(bytes.len() as u64);
}, &plan.target_bucket, streaming, requested_checksums).await
.context(
format!("while streaming to the bucket the product {}", product_name))
.unwrap();
pbar.finish_with_message(format!("Streamed '{}'", product_name));
debug!("Copied {} bytes", copied);
return ReleaseFileData::new(key, size, checksums);
},
Action::WriteFile { dst_path, contents } => {
// We don't need a progress bar here.
assert!(dst_path.is_relative(), "Unexpected absolute path in a channel script plan, all paths needs to be relative in order for them to be prefixed by the staging directory");
let key = dst_path.to_str().expect("Failed to convert a relative path to a UTF-8 key");
let size = contents.len();
let stream = tokio_stream::once(Ok(Bytes::from_iter(contents.into_iter())));
pbar.set_length(size as u64);
if streaming {
pbar.set_message(format!("Streaming '{}'", key));
} else {
pbar.set_message(format!("Writing '{}'", key));
}
let (copied, checksums) = stream_to_target(stream, Some(size), &key_prefix, key, |bytes| {
// This is not really called when the multipart
// has completed pushing this part.
// In practice, if we are pushing as soon as we read,
// we expect the bytes chunk to be small
// and therefore, this is fairly reasonable.
trace!("multipart streaming: chunk size = {}", bytes.len());
pbar.inc(bytes.len() as u64);
}, &plan.target_bucket, streaming, requested_checksums).await
.context(
format!("while streaming to the bucket the following key '{}'", key)
).unwrap();
pbar.finish_with_message(format!("Written '{}'", key));
debug!("Written {} bytes", copied);
return ReleaseFileData::new(key, size as u64, checksums);
},
Action::WriteCompressedStorePaths { compression } => {
let spaths = hydra_client.fetch_store_paths(&plan.release).await.context("while fetching store paths").expect("Failed to fetch store paths from Hydra");
let mut serialized = serde_json::to_vec(&spaths).context("while serializing store paths to JSON").expect("failed to serialize");
let key;
if matches!(compression, Compression::Zstd) {
let compressed_serialization = zstd::bulk::compress(&serialized, ZSTD_COMPRESSION_LEVEL).expect("Failed to compress the store-paths");
let _ = std::mem::replace(&mut serialized, compressed_serialization);
key = "store-paths.zst";
} else {
key = "store-paths";
}
let size = serialized.len();
let stream = tokio_stream::once(Ok(Bytes::from_iter(serialized.into_iter())));
pbar.set_length(size as u64);
if streaming {
pbar.set_message("Streaming compressed store paths");
} else {
pbar.set_message("Writing compressed store paths");
}
let (copied, checksums) = stream_to_target(stream, Some(size), &key_prefix, key, |bytes| {
// This is not really called when the multipart
// has completed pushing this part.
// In practice, if we are pushing as soon as we read,
// we expect the bytes chunk to be small
// and therefore, this is fairly reasonable.
trace!("multipart streaming: chunk size = {}", bytes.len());
pbar.inc(bytes.len() as u64);
}, &plan.target_bucket, streaming, requested_checksums).await
.context(
format!("while streaming to the bucket the following key '{}'", key)
).unwrap();
pbar.finish_with_message("Compressed store paths streamed");
debug!("Written {} bytes", copied);
return ReleaseFileData::new(key, size as u64, checksums);
},
};
};
pending.push(task);
}
let files: Vec<ReleaseFileData> = join_all(pending).await;
let target_path = plan.release.prefix(&plan.channel);
let rendered_page = crate::html::render_release_page(ReleasePageData {
channel_name: plan.channel.name.clone(),
release_name: plan.release.nix_name.clone(),
release_date: chrono::Utc::now().format("%Y-%m-%d %H-%M UTC").to_string(),
git_commit_link: format!("{}/{}", hydra_client.config.base_git_uri_for_revision, plan.revision),
git_revision: plan.revision,
// TODO: mega evil, clean up and expose config properly.
hydra_eval_link: plan.release.evaluation_url(&hydra_client.config.hydra_uri),
hydra_eval_id: plan.release.evaluation_id(),
checksums: plan_opts.requested_checksums.iter().map(|ck| ck.public_name().to_string()).collect(),
files
}).expect("Failed to render a release page");
let pbar = ProgressBar::new(0);
multi_pbar.add(pbar.clone());
pbar.set_style(ProgressStyle::with_template("[{elapsed_precise}] {bar:40.cyan/blue} {bytes:>7}/{total_bytes:>7} @ {bytes_per_sec:>7} {msg}").unwrap().progress_chars("##-"));
let key = if streaming {
plan.release.nix_name
} else {
format!("{}.html", plan.release.nix_name)
};
let contents = rendered_page.into_bytes();
let size = contents.len();
let stream = tokio_stream::once(Ok(Bytes::from_iter(contents.into_iter())));
pbar.set_length(size as u64);
if streaming {
pbar.set_message("Streaming the release HTML page");
} else {
pbar.set_message("Writing the release HTML page");
}
let (copied, _) = stream_to_target(stream, Some(size), &key_prefix, &key, |bytes| {
// This is not really called when the multipart
// has completed pushing this part.
// In practice, if we are pushing as soon as we read,
// we expect the bytes chunk to be small
// and therefore, this is fairly reasonable.
trace!("multipart streaming: chunk size = {}", bytes.len());
pbar.inc(bytes.len() as u64);
}, &plan.target_bucket, streaming, Vec::new()).await
.context(
format!("while streaming to the bucket the following key '{}'", key)
).unwrap();
debug!("Release HTML page sent; copied {} bytes", copied);
pbar.finish_with_message("Release HTML page streamed");
if streaming {
staging_directory.close().expect("Failed to get rid of the staging directory");
PlanInterpretation::Streamed {
remote_path: staging_prefix.into(),
target_path
}
} else {
PlanInterpretation::LocallyPlanned {
local_path: staging_directory,
target_path
}
}
}

36
src/config.rs Normal file
View file

@ -0,0 +1,36 @@
use object_store::aws::{AmazonS3, AmazonS3Builder};
use serde::Deserialize;
use std::path::PathBuf;
#[derive(Debug, Deserialize)]
pub struct MirrorConfig {
/// URI to Hydra instance
pub hydra_uri: String,
/// URI to the binary cache
pub binary_cache_uri: String,
/// Base URI to access to a web interface pertaining to a specific Git revision of the nixpkgs
/// input
pub base_git_uri_for_revision: String,
/// A path to a checkout of the target repository
pub repo_dir: PathBuf,
/// S3 releases bucket name
pub s3_release_bucket_name: String,
/// S3 channels bucket name
s3_channel_bucket_name: String,
}
impl MirrorConfig {
pub fn release_bucket(&self) -> AmazonS3 {
AmazonS3Builder::from_env()
.with_bucket_name(&self.s3_release_bucket_name)
.build()
.expect("Failed to connect to the S3 release bucket")
}
pub fn channel_bucket(&self) -> AmazonS3 {
AmazonS3Builder::from_env()
.with_bucket_name(&self.s3_channel_bucket_name)
.build()
.expect("Failed to connect to the S3 channel bucket")
}
}

131
src/git.rs Normal file
View file

@ -0,0 +1,131 @@
/// Very simple wrappers for Git shelling out.
use std::path::PathBuf;
use std::process::ExitStatus;
use std::io;
use thiserror::Error;
use tokio::process::Command;
use tokio::io::{BufReader, AsyncBufReadExt};
use tokio::sync::mpsc::{channel, Receiver};
use tokio::task::JoinHandle;
#[derive(Error, Debug)]
pub enum GitError {
#[error("Git command failed with status: {0}")]
CommandFailed(ExitStatus),
#[error("IO error occurred: {0}")]
IoError(#[from] io::Error),
#[error("Failed to capture {0}")]
StreamCaptureError(String),
#[error("Failed to wait for process to complete")]
WaitError,
}
/// Encapsulates the result of a Git command execution
pub struct GitCommand {
pub stdout: Receiver<String>,
pub stderr: Receiver<String>,
pub handle: JoinHandle<Result<(), GitError>>,
}
impl GitCommand {
/// Waits for the Git command to complete and returns the result.
pub async fn wait(self) -> Result<(), GitError> {
self.handle.await.unwrap_or_else(|_| Err(GitError::WaitError))
}
}
/// Git operations structure
pub struct Git {
repo: PathBuf,
remote: String,
}
impl Git {
pub fn new(repo: impl Into<PathBuf>, remote: impl Into<String>) -> Self {
Self {
repo: repo.into(),
remote: remote.into(),
}
}
pub fn default(repo: impl Into<PathBuf>) -> Self {
Self::new(repo, "origin")
}
pub async fn push(&self, rev: &str, channel_name: &str) -> Result<GitCommand, GitError> {
// Run "git remote update"
self.run_git_command(&["remote", "update", &self.remote]).await?;
// Prepare the "git push" command
let push_args = ["push", &self.remote, &format!("{rev}:refs/heads/{channel_name}")];
let command = self.run_git_command_with_streams(&push_args).await?;
Ok(command)
}
async fn run_git_command(&self, args: &[&str]) -> Result<(), GitError> {
let status = Command::new("git")
.args(args)
.current_dir(&self.repo)
.stderr(std::process::Stdio::inherit())
.stdout(std::process::Stdio::inherit())
.status()
.await?;
if !status.success() {
return Err(GitError::CommandFailed(status));
}
Ok(())
}
async fn run_git_command_with_streams(
&self,
args: &[&str],
) -> Result<GitCommand, GitError> {
let mut child = Command::new("git")
.args(args)
.current_dir(&self.repo)
.stderr(std::process::Stdio::piped())
.stdout(std::process::Stdio::piped())
.spawn()?;
let stdout = child.stdout.take().ok_or_else(|| GitError::StreamCaptureError("stdout".into()))?;
let stderr = child.stderr.take().ok_or_else(|| GitError::StreamCaptureError("stderr".into()))?;
let stdout_rx = Self::stream_output(stdout);
let stderr_rx = Self::stream_output(stderr);
// Spawn a task to wait for the command's completion
let handle = tokio::spawn(async move {
let status = child.wait().await.map_err(|e| GitError::IoError(e))?;
if !status.success() {
Err(GitError::CommandFailed(status))
} else {
Ok(())
}
});
Ok(GitCommand {
stdout: stdout_rx,
stderr: stderr_rx,
handle,
})
}
fn stream_output(
stream: impl tokio::io::AsyncRead + Unpin + Send + 'static,
) -> Receiver<String> {
let (tx, rx) = channel(100);
let reader = BufReader::new(stream);
tokio::spawn(async move {
let mut lines = reader.lines();
while let Ok(Some(line)) = lines.next_line().await {
if tx.send(line).await.is_err() {
break;
}
}
});
rx
}
}

49
src/html.rs Normal file
View file

@ -0,0 +1,49 @@
use std::collections::HashMap;
use handlebars::Handlebars;
use indicatif::HumanBytes;
use serde::Serialize;
use crate::actions::Checksum;
#[derive(Debug, Serialize)]
pub struct ReleaseFileData {
pub name: String,
pub size: String,
pub checksums: HashMap<String, String>
}
impl ReleaseFileData {
pub fn new(name: &str, size: u64, checksums: HashMap<Checksum, Box<[u8]>>) -> Self {
ReleaseFileData {
name: name.to_string(),
size: HumanBytes(size).to_string(),
checksums: checksums.into_iter().map(|(ck, digest)| {
(ck.public_name().to_string(),
hex::encode(digest))
}).collect()
}
}
}
#[derive(Debug, Serialize)]
#[serde(rename_all = "camelCase")]
pub struct ReleasePageData {
pub channel_name: String,
pub release_name: String,
pub release_date: String,
pub git_commit_link: String,
pub git_revision: String,
pub hydra_eval_link: String,
pub hydra_eval_id: u64,
pub files: Vec<ReleaseFileData>,
pub checksums: Vec<String>
}
pub fn render_release_page(page_data: ReleasePageData) -> Result<String, handlebars::RenderError> {
let reg = Handlebars::new();
let release_template = include_str!("./release.tpl");
reg.render_template(&release_template, &page_data)
}

338
src/hydra.rs Normal file
View file

@ -0,0 +1,338 @@
use std::{collections::HashMap, path::PathBuf};
use anyhow::Context;
use bytes::Bytes;
use futures::Stream;
use indicatif::ProgressBar;
use nix_compat::store_path::{StorePath, StorePathRef};
use regex::Regex;
use serde::{de::Error, Deserialize, Deserializer};
use tokio::io::AsyncWrite;
use crate::{actions::HydraProductType, config::MirrorConfig};
pub type ReleaseId = u64;
pub type EvaluationId = u64;
pub type BuildId = u64;
fn deser_bool_as_int<'de, D>(deserializer: D) -> Result<bool, D::Error>
where D: Deserializer<'de>
{
u8::deserialize(deserializer).map(|b| if b == 1 { true } else { false })
}
fn deser_bool_as_string<'de, D>(deserializer: D) -> Result<bool, D::Error>
where D: Deserializer<'de>
{
let s: &str = Deserialize::deserialize(deserializer)?;
match s {
"false" => Ok(false),
"true" => Ok(true),
_ => Err(Error::unknown_variant(s, &["false", "true"]))
}
}
#[derive(Debug, Deserialize)]
pub struct OutputPath {
#[serde(rename="path")]
store_path: String
}
#[derive(Debug, Deserialize)]
pub struct BuildProduct {
#[serde(rename="path")]
store_path: String,
name: String,
subtype: String,
sha256hash: String,
r#type: String,
#[serde(rename="filesize")]
file_size: u64
}
#[derive(Debug, Deserialize)]
pub struct BuildInfo {
jobset: String,
job: String,
project: String,
#[serde(deserialize_with = "deser_bool_as_int", rename="buildstatus")]
finished: bool,
#[serde(rename="releasename")]
release_name: Option<String>,
#[serde(rename="stoptime")]
stop_time: u64,
#[serde(rename="starttime")]
start_time: u64,
system: String,
id: BuildId,
#[serde(rename="buildoutputs")]
build_outputs: HashMap<String, OutputPath>,
#[serde(rename="buildproducts")]
build_products: HashMap<String, BuildProduct>,
#[serde(rename="nixname")]
nix_name: String,
#[serde(rename="drvpath")]
drv_path: String,
}
#[derive(Debug, Clone)]
pub struct Channel {
pub name: String
}
#[non_exhaustive]
pub enum ChannelType {
NixOS,
Nixpkgs,
}
#[non_exhaustive]
pub enum ChannelVariant {
Normal,
Small
}
impl Channel {
pub fn version(&self) -> String {
let re = Regex::new("([a-z]+)-(?<ver>.*)").unwrap();
let caps = re.captures(&self.name).expect("Failed to parse the channel name");
caps["ver"].to_string()
}
pub fn prefix(&self) -> String {
let re = Regex::new("(?<name>[a-z]+)-(.*)").unwrap();
let caps = re.captures(&self.name).expect("Failed to parse the channel name");
caps["name"].to_string()
}
// TODO: just regex match?
pub fn r#type(&self) -> ChannelType {
ChannelType::NixOS
}
pub fn variant(&self) -> ChannelVariant {
ChannelVariant::Normal
}
}
#[derive(Deserialize, Debug)]
pub struct Release {
pub id: ReleaseId,
job: String,
#[serde(rename = "releasename")]
release_name: Option<String>,
#[serde(rename = "starttime")]
start_time: u64,
#[serde(rename = "stoptime")]
stop_time: u64,
#[serde(rename = "nixname")]
pub nix_name: String,
#[serde(rename = "jobsetevals")]
jobset_evals: Vec<EvaluationId>,
jobset: String,
#[serde(deserialize_with = "deser_bool_as_int")]
finished: bool,
priority: u64,
system: String,
timestamp: u64,
project: String,
#[serde(rename = "drvpath")]
derivation_path: String,
// ignored: buildproducts, buildoutputs, buildmetrics, buildstatus
}
#[derive(Debug)]
pub struct GitInput {
uri: String,
revision: String
}
// FIXME(Raito): for some reason, #[serde(tag = "type"), rename_all = "lowercase"] doesn't behave
// correctly, and causes deserialization failures in practice. `untagged` is suboptimal but works
// because of the way responses works...
#[derive(Debug, Deserialize)]
#[serde(untagged, expecting = "An valid jobset input")]
pub enum Input {
Boolean {
#[serde(deserialize_with = "deser_bool_as_string")]
value: bool
},
Git { uri: String, revision: String },
Nix { value: String },
}
#[derive(Deserialize, Debug)]
pub struct Evaluation {
pub id: EvaluationId,
#[serde(rename="checkouttime")]
checkout_time: u64,
#[serde(rename="evaltime")]
eval_time: u64,
flake: Option<String>,
#[serde(rename="jobsetevalinputs", default)]
pub jobset_eval_inputs: HashMap<String, Input>,
timestamp: u64,
builds: Vec<BuildId>,
}
impl Release {
pub fn version(&self) -> String {
let re = Regex::new(".+-(?<ver>[0-9].+)").unwrap();
let caps = re.captures(&self.nix_name).expect("Failed to parse the release name");
caps["ver"].to_string()
}
pub fn evaluation_id(&self) -> u64 {
*self.jobset_evals.first().expect("Failed to obtain the corresponding evaluation, malformed release?")
}
pub fn evaluation_url(&self, hydra_base_uri: &str) -> String {
let eval_id = self.evaluation_id();
format!("{}/eval/{}", hydra_base_uri, eval_id)
}
pub fn job_url(&self, hydra_base_uri: &str, job_name: &str) -> String {
let eval_id = self.evaluation_id();
format!("{}/eval/{}/job/{}", hydra_base_uri, eval_id, job_name)
}
pub fn store_paths_url(&self, hydra_base_uri: &str) -> String {
let eval_id = self.evaluation_id();
format!("{}/eval/{}/store-paths", hydra_base_uri, eval_id)
}
/// Directory related to this release.
fn directory(&self, channel: &Channel) -> String {
match channel.name.as_str() {
"nixpkgs-unstable" => "nixpkgs".to_string(),
_ => format!("{}/{}", channel.prefix(), channel.version())
}
}
pub fn prefix(&self, channel: &Channel) -> object_store::path::Path {
format!("{}/{}", self.directory(channel), self.nix_name).into()
}
}
pub fn release_uri(hydra_uri: &str, job_name: &str) -> String {
format!("{}/job/{}/latest", hydra_uri, job_name)
}
#[derive(Debug)]
pub struct HydraClient<'a> {
pub config: &'a MirrorConfig,
}
impl HydraClient<'_> {
pub async fn fetch_release(&self, job_name: &str) -> reqwest::Result<Release> {
let client = reqwest::Client::new();
let resp = client.get(release_uri(&self.config.hydra_uri, job_name))
.header("Accept", "application/json")
// TODO: put a proper version
.header("User-Agent", "nixos-channel-scripts (rust)")
.send()
.await?;
resp.json().await
}
pub async fn fetch_evaluation(&self, release: &Release) -> reqwest::Result<Evaluation> {
let client = reqwest::Client::new();
let resp = client.get(release.evaluation_url(&self.config.hydra_uri))
.header("Accept", "application/json")
.header("User-Agent", "nixos-channel-scripts (rust)")
.send()
.await?;
resp.json().await
}
pub async fn fetch_store_paths(&self, release: &Release) -> anyhow::Result<Vec<StorePath<String>>> {
let client = reqwest::Client::new();
Ok(client.get(release.store_paths_url(&self.config.hydra_uri))
.header("Accept", "application/json")
.header("User-Agent", "nixos-channel-scripts (rust)")
.send()
.await?
.json()
.await
.context(format!("while downloading store-paths information for an evaluation"))?
)
}
pub async fn hydra_product_bytes_stream(&self, release: &Release, job_name: &str, product_type: Option<HydraProductType>) -> anyhow::Result<(u64, impl Stream<Item = std::io::Result<Bytes>>)> {
let client = reqwest::Client::new();
let build_info: BuildInfo = client.get(release.job_url(&self.config.hydra_uri, job_name))
.header("Accept", "application/json")
.header("User-Agent", "nixos-channel-scripts (rust)")
.send()
.await?
.json()
.await
.context(format!("while downloading build information from {}", release.job_url(&self.config.hydra_uri, job_name)))?;
let mut products_by_subtype: HashMap<String, BuildProduct> = HashMap::new();
for (_, product) in build_info.build_products {
if products_by_subtype.contains_key(&product.subtype) {
todo!("Job {} has multiple products of the same subtype {:?}. This is not supported yet.", job_name, product_type);
}
products_by_subtype.insert(product.subtype.clone(), product);
}
if products_by_subtype.len() > 1 && product_type.is_none() {
panic!("Job {} has {} build products. Select the right product via the subtypes.", job_name, products_by_subtype.len());
}
let product: BuildProduct = if let Some(ptype) = product_type {
products_by_subtype.remove(&ptype.to_string()).expect(&format!("Expected product type '{}' but not found in the list of products", &ptype.to_string()))
} else {
products_by_subtype.into_iter().last().expect(&format!("Expected at least one build product in job {}, found zero", job_name)).1
};
// 3. FIXME: validate sha256hash during read?
let (store_path, rel_path) = StorePathRef::from_absolute_path_full(&product.store_path).expect("Failed to parse the product's store path");
crate::nar::file_in_nar_bytes_stream(&self.config.binary_cache_uri, &nix_compat::nixbase32::encode(store_path.digest()), rel_path.to_str().unwrap()).await.context("while copying the NAR to the target")
}
pub async fn copy_hydra_product<W: AsyncWrite + Unpin>(&self, release: &Release, job_name: &str, product_type: Option<HydraProductType>, pbar: ProgressBar, out: &mut W) -> anyhow::Result<u64> {
// TODO: dry me?
let client = reqwest::Client::new();
let build_info: BuildInfo = client.get(release.job_url(&self.config.hydra_uri, job_name))
.header("Accept", "application/json")
.header("User-Agent", "nixos-channel-scripts (rust)")
.send()
.await?
.json()
.await
.context(format!("while downloading build information from {}", release.job_url(&self.config.hydra_uri, job_name)))?;
let mut products_by_subtype: HashMap<String, BuildProduct> = HashMap::new();
for (_, product) in build_info.build_products {
if products_by_subtype.contains_key(&product.subtype) {
todo!("Job {} has multiple products of the same subtype {:?}. This is not supported yet.", job_name, product_type);
}
products_by_subtype.insert(product.subtype.clone(), product);
}
if products_by_subtype.len() > 1 && product_type.is_none() {
panic!("Job {} has {} build products. Select the right product via the subtypes.", job_name, products_by_subtype.len());
}
let product: BuildProduct = if let Some(ptype) = product_type {
products_by_subtype.remove(&ptype.to_string()).expect(&format!("Expected product type '{}' but not found in the list of products", &ptype.to_string()))
} else {
products_by_subtype.into_iter().last().expect(&format!("Expected at least one build product in job {}, found zero", job_name)).1
};
// 3. FIXME: validate sha256hash during read?
let (store_path, rel_path) = StorePathRef::from_absolute_path_full(&product.store_path).expect("Failed to parse the product's store path");
crate::nar::copy_file_in_nar(&self.config.binary_cache_uri, &nix_compat::nixbase32::encode(store_path.digest()), rel_path.to_str().unwrap(), pbar, out).await.context("while copying the NAR to the target")
}
}

321
src/main.rs Normal file
View file

@ -0,0 +1,321 @@
mod config;
mod actions;
mod hydra;
mod git;
mod nar;
mod html;
use std::path::PathBuf;
use actions::{interpret_plan, Plan, PlanInterpretation, PlanOptions};
use chrono::Duration;
use clap::{Subcommand, Parser, Args};
use hydra::{Channel, Evaluation, HydraClient, Release};
use tokio_stream::StreamExt;
use tracing::{info, trace, warn};
use object_store::{aws::AmazonS3, ObjectStore};
use tempdir::TempDir;
use tracing_opentelemetry::OpenTelemetryLayer;
use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt};
use opentelemetry::trace::TracerProvider;
#[derive(Debug, Args)]
struct ChannelArgs {
/// Channel name to update
channel_name: String,
/// Job name to fetch from the Hydra instance configured
job_name: String,
}
#[derive(Debug, Args)]
struct RemoteArgs {
/// Channel name to update
channel_name: String,
/// Staging prefix
staging_prefix: String
}
#[derive(Debug, Args)]
struct GlobalOpts {
/// TOML configuration file for channel updates
#[arg(short, long)]
config_file: PathBuf,
/// Whether to execute no remote side effects (S3 uploads, redirections), etc.
#[arg(short, long, default_value_t = false)]
dry_run: bool,
/// Whether to bypass all preflight checks
#[arg(short, long, default_value_t = false)]
bypass_preflight_checks: bool,
/// Enable or disable progress bar reports
#[arg(short, long, default_value_t = true)]
enable_progress_bar: bool,
/// Stream the plan instead of preparing it in a local directory
#[arg(short, long, default_value_t = true)]
stream: bool,
#[arg(long, default_values = &["sha256", "blake3"])]
checksum: Vec<actions::Checksum>
}
#[derive(Debug, Parser)]
#[command(version, about, long_about = None)]
struct App {
#[command(flatten)]
global_opts: GlobalOpts,
#[command(subcommand)]
command: Commands
}
#[derive(Debug, Subcommand)]
enum Commands {
/// Print the plan for the given channel name and job name
Plan(ChannelArgs),
/// Apply the plan that would be generated for the given channel name and job name
Apply(ChannelArgs),
/// Apply an existing remote plan that was planned in the past
ApplyRemote(RemoteArgs),
/// Clean up failed streamed prefixes
CleanupStreamedPrefixes,
}
#[derive(Debug)]
struct PreflightCheckContext<'a> {
target_channel: &'a Channel,
new_release: &'a Release,
new_evaluation: &'a Evaluation,
current_release_path: Option<object_store::path::Path>,
}
#[tracing::instrument]
async fn run_preflight_checks(channel: &Channel, release: &Release, evaluation: &Evaluation, channel_bucket: AmazonS3) -> bool {
info!("Running pre-flight checks...");
let channel_name = object_store::path::Path::parse(&channel.name).expect("Channel name should be a valid S3 path");
let mut context = PreflightCheckContext {
target_channel: channel,
new_release: release,
new_evaluation: evaluation,
current_release_path: None
};
match channel_bucket.get_opts(&channel_name, object_store::GetOptions { head: true, ..Default::default() }).await {
Ok(object_store::GetResult { attributes, meta, .. }) => {
info!("Release found: {:?}", meta);
trace!("Attributes: {:#?}", attributes);
if let Some(redirection_target) = attributes.get(&object_store::Attribute::Metadata("x-amz-website-redirect-location".into())) {
context.current_release_path = Some(redirection_target.to_string().into());
}
},
Err(err) => {
warn!("Error while asking the channel bucket: {}", err);
todo!();
}
}
trace!("Preflight context check assembled: {:?}", context);
// TODO: run anti rollback protection
true
}
#[derive(Debug)]
enum PlanResult {
AlreadyExecuted {
remote_path: object_store::path::Path
},
LocallyPrepared {
local_path: TempDir,
target_path: object_store::path::Path
},
StreamedRemotely {
remote_path: object_store::path::Path,
target_path: object_store::path::Path
},
}
#[derive(Debug)]
struct PlanMetadata {
/// Git SHA1 induced by this plan
revision: String
}
impl From<PlanInterpretation> for PlanResult {
fn from(value: PlanInterpretation) -> Self {
match value {
PlanInterpretation::Streamed { remote_path, target_path } => PlanResult::StreamedRemotely {
remote_path,
target_path
},
PlanInterpretation::LocallyPlanned { local_path, target_path } => PlanResult::LocallyPrepared {
local_path,
target_path,
},
}
}
}
#[tracing::instrument]
async fn move_directory<OS: object_store::ObjectStore>(client: OS, from: &object_store::path::Path, to: &object_store::path::Path) -> std::io::Result<()> {
let prefix = from.to_string();
while let Some(object) = client.list(Some(from)).try_next().await? {
let source = object.location.to_string();
let stripped_object_path = source.strip_prefix(&prefix).expect(
"Failed to strip the `from` prefix during move directory"
);
let target = format!("{}/{}", to.to_string(), stripped_object_path);
client.rename(&object.location, &target.into()).await?;
}
Ok(())
}
#[tracing::instrument]
async fn plan(hydra_client: &HydraClient<'_>, channel_name: &str, job_name: String, global_opts: &GlobalOpts, config: &config::MirrorConfig) -> std::io::Result<(PlanResult, PlanMetadata)> {
let chan = Channel {
name: channel_name.to_string()
};
info!("Planning for channel {} using job {}", &chan.name, &job_name);
let release = hydra_client.fetch_release(&job_name)
.await.expect("Failed to fetch release");
trace!("{:#?}", release);
let evaluation = hydra_client.fetch_evaluation(&release)
.await.expect("Failed to fetch evaluation");
trace!("{:?}", evaluation.jobset_eval_inputs);
let plan_options = PlanOptions {
streaming: global_opts.stream,
enable_progress_bar: global_opts.enable_progress_bar,
requested_checksums: global_opts.checksum.clone()
};
if let hydra::Input::Git { revision, .. } = evaluation.jobset_eval_inputs.get("nixpkgs").expect("Expected a nixpkgs repository") {
info!("Release information:\n- Release is: {} (build {})\n- Eval is: {}\n- Prefix is: {}\n- Git commit is {}\n",
release.nix_name, release.id, evaluation.id, release.prefix(&chan), revision);
let release_bucket = config.release_bucket();
let metadata = PlanMetadata {
revision: revision.to_owned()
};
// If the release already exists, skip it.
if release_bucket.head(&release.prefix(&chan)).await.is_ok() {
tracing::warn!("Release already exists, skipping");
return Ok((PlanResult::AlreadyExecuted {
remote_path: release.prefix(&chan)
}, metadata));
}
let channel_bucket = config.channel_bucket();
if global_opts.bypass_preflight_checks || run_preflight_checks(&chan, &release, &evaluation, channel_bucket).await {
let actions = actions::generate_plan(&config, &chan, &release, &evaluation).await;
tracing::debug!("Action plan: {:#?}", actions);
let interpretation = interpret_plan(&hydra_client, Plan {
actions,
release,
channel: chan.clone(),
target_bucket: config.release_bucket(),
revision: revision.to_string(),
}, plan_options).await;
info!("Plan interpreted: {:?}", interpretation);
Ok((interpretation.into(), metadata))
} else {
tracing::error!("Preflight check failed, cannot continue, pass `--force` if you want to bypass preflight checks");
Err(std::io::Error::other("preflight check failure"))
}
} else {
panic!("Nixpkgs input is not of type Git");
}
}
#[tracing::instrument]
async fn sync_channel_over_git(repo: &PathBuf, plan_metadata: &PlanMetadata, branch_name: &str) -> Result<(), ()> {
let repo = git::Git::default(repo);
// TODO: better error handling
Ok(repo.push(&plan_metadata.revision, &branch_name).await
.expect("Failed to prepare the push")
.wait()
.await
.expect("Failed to push"))
}
#[tokio::main]
async fn main() -> std::io::Result<()> {
let args = App::parse();
let config: config::MirrorConfig = toml::from_str(&std::fs::read_to_string(&args.global_opts.config_file)
.expect("Failed to read the configuration file"))
.expect("Failed to deserialize the configuration file");
let hydra_client: HydraClient = HydraClient {
config: &config
};
let trace_provider = opentelemetry_otlp::new_pipeline()
.tracing()
.with_trace_config(opentelemetry_sdk::trace::Config::default().with_resource(opentelemetry_sdk::Resource::new(vec![
opentelemetry::KeyValue::new("service.name", env!("CARGO_PKG_NAME")),
opentelemetry::KeyValue::new("service.version", env!("CARGO_PKG_VERSION")),
])))
.with_exporter(opentelemetry_otlp::new_exporter().http())
.install_batch(opentelemetry_sdk::runtime::Tokio)
.expect("Couldn't create OTLP tracer");
opentelemetry::global::set_tracer_provider(trace_provider.clone());
let fmt_layer = tracing_subscriber::fmt::layer().with_writer(std::io::stderr);
tracing_subscriber::registry()
.with(tracing_subscriber::EnvFilter::from_default_env())
.with(fmt_layer)
.with(OpenTelemetryLayer::new(trace_provider.tracer("channel-scripts")))
.init();
match args.command {
Commands::Plan(channel) => {
plan(&hydra_client, &channel.channel_name, channel.job_name, &args.global_opts, &config).await?;
},
Commands::Apply(channel) => {
let (plan, metadata) = plan(&hydra_client, &channel.channel_name, channel.job_name, &args.global_opts, &config).await?;
match plan {
PlanResult::AlreadyExecuted { .. } => {
info!("Plan was already executed, no further action required");
},
PlanResult::LocallyPrepared { .. } => todo!("Uploading a plan is a todo; use streaming!"),
PlanResult::StreamedRemotely { remote_path, target_path } => {
info!("Plan was streamed remotely successfully to '{}@{}', replacing {}", config.s3_release_bucket_name, remote_path, target_path);
move_directory(config.release_bucket(), &remote_path, &target_path)
.await.expect("Failed to atomically promote the streamed channel, aborting");
}
}
sync_channel_over_git(&config.repo_dir, &metadata, &channel.channel_name).await.expect("Failed to synchronize the Git repository for this channel");
// TODO: redirects all URIs
},
Commands::ApplyRemote(remote) => {
let release = config.release_bucket();
// TODO: fetch the release and obtain the right name.
move_directory(config.release_bucket(), &remote.staging_prefix.try_into().unwrap(), &remote.channel_name.try_into().unwrap())
.await.expect("Failed to atomically rename the staging prefix into the channel name");
todo!("Remote apply does not support synchronizing the Git repository yet. We need to fetch the remote revision and channel name to perform it.");
// TODO: redirects all URIs
},
Commands::CleanupStreamedPrefixes => {
let channel = config.channel_bucket();
let cutoff_date = chrono::Utc::now() - Duration::days(1);
// List all failed prefixes in temp/
let temp_prefix = object_store::path::Path::from("temp/");
while let Some(failed_or_pending_prefix) = channel.list(Some(&temp_prefix)).try_next().await? {
if failed_or_pending_prefix.last_modified <= cutoff_date {
channel.delete(&failed_or_pending_prefix.location).await?;
}
}
},
}
Ok(())
}

117
src/nar.rs Normal file
View file

@ -0,0 +1,117 @@
use std::borrow::{Borrow, BorrowMut};
use anyhow::{bail, Context};
use async_compression::tokio::bufread::ZstdDecoder;
use bytes::Bytes;
use futures::{Stream, TryStreamExt};
use indicatif::ProgressBar;
use tracing::debug;
use nix_compat::nar::listing::{Listing, ListingEntry};
use nix_compat::narinfo::NarInfo;
use tokio::io::{AsyncReadExt, AsyncWrite};
use tokio_util::io::{ReaderStream, StreamReader};
/// Returns a NarInfo from a HTTP binary cache.
#[tracing::instrument]
pub async fn get_narinfo(binary_cache_uri: &str, narinfo_hash: &str) -> reqwest::Result<String> {
let client = reqwest::Client::new();
let resp = client.get(format!("{}/{}.narinfo", binary_cache_uri, narinfo_hash))
.header("User-Agent", "nixos-channel-scripts (rust)")
.send()
.await?;
resp.text().await
}
/// Returns a listing from a HTTP binary cache.
#[tracing::instrument]
pub async fn get_listing(binary_cache_uri: &str, narinfo_hash: &str) -> reqwest::Result<Listing> {
let client = reqwest::Client::builder().brotli(true).build().unwrap();
let resp = client.get(format!("{}/{}.ls", binary_cache_uri, narinfo_hash))
.header("User-Agent", "nixos-channel-scripts (rust)")
.send()
.await?;
resp.json().await
}
/// Returns a `Content-Range`-aware HTTP body reader of the NAR.
pub async fn ranged_nar_reader(binary_cache_uri: &str, narinfo: NarInfo<'_>, start: u64, end: u64) -> reqwest::Result<impl Stream<Item=reqwest::Result<bytes::Bytes>>> {
let client = reqwest::Client::new();
// TODO: handle compression?
let resp = client.get(format!("{}/{}", binary_cache_uri, narinfo.url))
.header("User-Agent", "nixos-channel-scripts (rust)")
.header("Content-Range", format!("bytes {}-{}/*", start, end))
.send()
.await?;
Ok(resp.bytes_stream())
}
#[tracing::instrument]
pub async fn nar_reader(binary_cache_uri: &str, narinfo: NarInfo<'_>) -> reqwest::Result<impl Stream<Item=reqwest::Result<bytes::Bytes>>> {
let client = reqwest::Client::new();
// TODO: handle compression?
let resp = client.get(format!("{}/{}", binary_cache_uri, narinfo.url))
.header("User-Agent", "nixos-channel-scripts (rust)")
.send()
.await?;
Ok(resp.bytes_stream())
}
/// Streams a binary cache file contained in a NAR
/// by using narinfo + listing information.
#[tracing::instrument]
pub async fn file_in_nar_bytes_stream(binary_cache_uri: &str, narinfo_hash: &str, path: &str) -> anyhow::Result<(u64, impl Stream<Item = std::io::Result<Bytes>>)> {
let narinfo_str: String = get_narinfo(binary_cache_uri, narinfo_hash).await?;
let narinfo: NarInfo = NarInfo::parse(narinfo_str.as_ref()).context("while parsing the narinfo").expect("Failed to parse narinfo from HTTP binary cache server");
let listing: Listing = get_listing(binary_cache_uri, narinfo_hash).await.context("while parsing the listing of that NAR")?;
match listing {
Listing::V1 { root, .. } => {
let entry = root.locate(path)
.expect("Invalid relative path to the NAR")
.ok_or(tokio::io::Error::new(tokio::io::ErrorKind::NotFound, format!("Entry {} not found in the NAR listing", path)))?;
if let ListingEntry::Regular {
size, nar_offset, ..
} = entry {
// TODO: this is ugly.
let mut file_reader = ZstdDecoder::new(
StreamReader::new(nar_reader(binary_cache_uri, narinfo).await?.map_err(std::io::Error::other))
);
// Discard *nar_offset bytes
let discarded = tokio::io::copy(&mut file_reader.borrow_mut().take(*nar_offset), &mut tokio::io::sink()).await
.context("while discarding the start of the NAR").unwrap();
debug!("discarded {} bytes and taking at most {} bytes", discarded, *size);
// Let the consumer copy at most *size bytes of data.
Ok((*size, ReaderStream::new(file_reader.take(*size))))
} else {
bail!("Expected a file, obtained either a symlink or a directory");
}
},
_ => bail!("Unsupported listing version")
}
}
/// Hits the binary cache with a narinfo request
/// and copies a specific file to the provided writer
/// asynchronously.
pub async fn copy_file_in_nar<W: AsyncWrite + Unpin>(binary_cache_uri: &str, narinfo_hash: &str, path: &str, pbar: ProgressBar, out: &mut W) -> anyhow::Result<u64> {
let (size, stream) = file_in_nar_bytes_stream(binary_cache_uri, narinfo_hash, path).await?;
pbar.set_length(size);
let copied = tokio::io::copy(
&mut StreamReader::new(stream),
&mut pbar.wrap_async_write(out)
).await.context("while copying the file in the NAR").unwrap();
assert!(copied == size, "mismatch in the copy sizes");
pbar.finish();
return Ok(copied)
}

72
src/release.html Normal file
View file

@ -0,0 +1,72 @@
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<title>{{channelName}} - NixOS Channel</title>
<style>
body {
font-family: Arial, sans-serif;
margin: 20px;
}
.header {
margin-bottom: 20px;
}
.header h1 {
margin: 0;
font-size: 1.5em;
}
.header p {
margin: 5px 0;
color: #666;
}
.links a {
color: #0066cc;
text-decoration: none;
}
.file-list {
width: 100%;
border-collapse: collapse;
margin-top: 20px;
}
.file-list th, .file-list td {
border: 1px solid #ddd;
padding: 8px;
text-align: left;
}
.file-list th {
background-color: #f2f2f2;
}
</style>
</head>
<body>
<div class="header">
<h1>Channel: {{channelName}}</h1>
<p>Release: {{releaseName}}</p>
<p>Date of Release: {{releaseDate}}</p>
<div class="links">
<a href="{{gitCommitLink}}">Git Commit</a> |
<a href="{{hydraEvalLink}}">Hydra Evaluation</a>
</div>
</div>
<table class="file-list">
<thead>
<tr>
<th>File Name</th>
<th>Size</th>
<th>SHA-256 Hash</th>
</tr>
</thead>
<tbody>
{{#each files}}
<tr>
<td>{{name}}</td>
<td>{{size}}</td>
<td>{{sha256}}</td>
</tr>
{{/each}}
</tbody>
</table>
</body>
</html>

78
src/release.tpl Normal file
View file

@ -0,0 +1,78 @@
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<title>{{channelName}} - NixOS Channel</title>
<style>
body {
font-family: Arial, sans-serif;
margin: 20px;
}
.header {
margin-bottom: 20px;
}
.header h1 {
margin: 0;
font-size: 1.5em;
}
.header p {
margin: 5px 0;
color: #666;
}
.links a {
color: #0066cc;
text-decoration: none;
}
.file-list {
width: 100%;
border-collapse: collapse;
margin-top: 20px;
}
.file-list th, .file-list td {
border: 1px solid #ddd;
padding: 8px;
text-align: left;
}
.file-list th {
background-color: #f2f2f2;
}
</style>
</head>
<body>
<div class="header">
<h1>Channel: {{channelName}}</h1>
<p>Release: {{releaseName}}</p>
<p>Date of Release: {{releaseDate}}</p>
<div class="links">
<a href="{{gitCommitLink}}">Git Commit</a> |
<a href="{{hydraEvalLink}}">Hydra Evaluation</a>
</div>
</div>
<table class="file-list">
<thead>
<tr>
<th>File Name</th>
<th>Size</th>
{{#if checksums}}
{{#each checksums as |checksum|}}
<th>{{checksum}} hash</th>
{{/each}}
{{/if}}
</tr>
</thead>
<tbody>
{{#each files}}
<tr>
<td><a href="./{{name}}">{{name}}</a></td>
<td>{{size}}</td>
{{#each checksums as |checksum|}}
<td>{{this}}</td>
{{/each}}
</tr>
{{/each}}
</tbody>
</table>
</body>
</html>