Compare commits
No commits in common. "main" and "main" have entirely different histories.
5
.gitignore
vendored
5
.gitignore
vendored
|
@ -1,6 +1 @@
|
||||||
result
|
result
|
||||||
|
|
||||||
|
|
||||||
# Added by cargo
|
|
||||||
|
|
||||||
/target
|
|
||||||
|
|
2933
Cargo.lock
generated
2933
Cargo.lock
generated
File diff suppressed because it is too large
Load diff
39
Cargo.toml
39
Cargo.toml
|
@ -1,39 +0,0 @@
|
||||||
[package]
|
|
||||||
name = "mirror-forkos"
|
|
||||||
version = "0.1.0"
|
|
||||||
edition = "2021"
|
|
||||||
|
|
||||||
[dependencies]
|
|
||||||
clap = { version = "4.5.13", features = [ "derive" ] }
|
|
||||||
indicatif = { version = "0.17.8", features = ["futures", "tokio"] }
|
|
||||||
object_store = { version = "0.10.2", features = [ "aws" ] }
|
|
||||||
regex = "1.10.6"
|
|
||||||
reqwest = { version = "0.12.5", default-features = false, features = ["brotli", "http2", "rustls-tls-native-roots"] }
|
|
||||||
serde = "1.0.204"
|
|
||||||
tempdir = "0.3.7"
|
|
||||||
textwrap = "0.16.1"
|
|
||||||
tokio = { version = "1.39.2", features = ["full"] }
|
|
||||||
toml = "0.8.19"
|
|
||||||
nix-compat = { git = "https://github.com/tvlfyi/tvix" }
|
|
||||||
futures = "0.3.30"
|
|
||||||
anyhow = "1.0.86"
|
|
||||||
bytes = "1.7.1"
|
|
||||||
tokio-util = { version = "0.7.11", features = ["io"] }
|
|
||||||
termtree = "0.5.1"
|
|
||||||
walkdir = "2.5.0"
|
|
||||||
handlebars = "6.0.0"
|
|
||||||
serde_json = "1.0.127"
|
|
||||||
tracing = { version = "0.1.40", features = ["max_level_debug", "release_max_level_info"] }
|
|
||||||
zstd = "0.13.2"
|
|
||||||
tokio-stream = "0.1.15"
|
|
||||||
async-compression = { version = "0.4.12", features = ["tokio", "zstd"] }
|
|
||||||
digest = "0.10.7"
|
|
||||||
blake3 = { version = "1.5.4", features = ["traits-preview"] }
|
|
||||||
sha2 = "0.10.8"
|
|
||||||
hex = "0.4.3"
|
|
||||||
chrono = "0.4.38"
|
|
||||||
tracing-subscriber = { version = "0.3.18", features = ["env-filter"] }
|
|
||||||
opentelemetry = "0.24.0"
|
|
||||||
opentelemetry_sdk = { version = "0.24.1", features = ["rt-tokio", "trace"] }
|
|
||||||
tracing-opentelemetry = { version = "0.25.0", features = ["metrics"] }
|
|
||||||
opentelemetry-otlp = { version = "0.17.0", features = ["http-json", "reqwest-client", "reqwest-rustls"] }
|
|
|
@ -1,4 +0,0 @@
|
||||||
{
|
|
||||||
"git+https://github.com/tvlfyi/tvix#nix-compat-derive@0.1.0": "19dv9yycflfyd9wcz65xa2ymvcp0a6qknmdxr8db6s77vhdzqldj",
|
|
||||||
"git+https://github.com/tvlfyi/tvix#nix-compat@0.1.0": "19dv9yycflfyd9wcz65xa2ymvcp0a6qknmdxr8db6s77vhdzqldj"
|
|
||||||
}
|
|
14
default.nix
14
default.nix
|
@ -1,11 +1,3 @@
|
||||||
{ pkgs ? import <nixpkgs> {} }:
|
(import (fetchTarball https://github.com/edolstra/flake-compat/archive/master.tar.gz) {
|
||||||
{
|
src = builtins.fetchGit ./.;
|
||||||
shell = pkgs.mkShell {
|
}).defaultNix
|
||||||
buildInputs = [
|
|
||||||
pkgs.cargo
|
|
||||||
pkgs.rustc
|
|
||||||
pkgs.openssl
|
|
||||||
pkgs.pkg-config
|
|
||||||
];
|
|
||||||
};
|
|
||||||
}
|
|
||||||
|
|
63
flake.nix
63
flake.nix
|
@ -1,17 +1,74 @@
|
||||||
{
|
{
|
||||||
description = "Channel release management tools";
|
description = "Script for generating Nixpkgs/NixOS channels";
|
||||||
|
|
||||||
inputs.nixpkgs.url = "github:NixOS/nixpkgs/nixos-24.05-small";
|
inputs.nixpkgs.url = "github:NixOS/nixpkgs/nixos-24.05-small";
|
||||||
|
|
||||||
outputs = { self, nixpkgs }:
|
outputs = { self, nixpkgs }:
|
||||||
{
|
{
|
||||||
overlays.default = final: prev: {
|
overlays.default = final: prev: {
|
||||||
mirror-forkos = (final.callPackage ./Cargo.nix { }).rootCrate.build;
|
nixos-channel-native-programs = with final; stdenv.mkDerivation {
|
||||||
|
name = "nixos-channel-native-programs";
|
||||||
|
buildInputs = [
|
||||||
|
nix
|
||||||
|
pkg-config
|
||||||
|
boehmgc
|
||||||
|
nlohmann_json
|
||||||
|
boost
|
||||||
|
sqlite
|
||||||
|
];
|
||||||
|
|
||||||
|
buildCommand = let
|
||||||
|
nixHasSignalsHh = nixpkgs.lib.strings.versionAtLeast nix.version "2.19";
|
||||||
|
in ''
|
||||||
|
mkdir -p $out/bin
|
||||||
|
|
||||||
|
g++ -Os -g ${./index-debuginfo.cc} -Wall -std=c++14 -o $out/bin/index-debuginfo -I . \
|
||||||
|
$(pkg-config --cflags nix-main) \
|
||||||
|
$(pkg-config --libs nix-main) \
|
||||||
|
$(pkg-config --libs nix-store) \
|
||||||
|
-lsqlite3 \
|
||||||
|
${nixpkgs.lib.optionalString nixHasSignalsHh "-DHAS_SIGNALS_HH"}
|
||||||
|
'';
|
||||||
|
};
|
||||||
|
|
||||||
|
nixos-channel-scripts = with final; stdenv.mkDerivation {
|
||||||
|
name = "nixos-channel-scripts";
|
||||||
|
|
||||||
|
buildInputs = with perlPackages;
|
||||||
|
[ nix
|
||||||
|
sqlite
|
||||||
|
makeWrapper
|
||||||
|
perl
|
||||||
|
FileSlurp
|
||||||
|
LWP
|
||||||
|
LWPProtocolHttps
|
||||||
|
ListMoreUtils
|
||||||
|
DBDSQLite
|
||||||
|
NetAmazonS3
|
||||||
|
brotli
|
||||||
|
jq
|
||||||
|
nixos-channel-native-programs
|
||||||
|
nix-index
|
||||||
|
];
|
||||||
|
|
||||||
|
buildCommand = ''
|
||||||
|
mkdir -p $out/bin
|
||||||
|
|
||||||
|
cp ${./mirror-nixos-branch.pl} $out/bin/mirror-nixos-branch
|
||||||
|
wrapProgram $out/bin/mirror-nixos-branch \
|
||||||
|
--set PERL5LIB $PERL5LIB \
|
||||||
|
--set XZ_OPT "-T0" \
|
||||||
|
--prefix PATH : ${lib.makeBinPath [ wget git nix gnutar xz rsync openssh nix-index nixos-channel-native-programs ]}
|
||||||
|
|
||||||
|
patchShebangs $out/bin
|
||||||
|
'';
|
||||||
|
};
|
||||||
|
|
||||||
};
|
};
|
||||||
|
|
||||||
defaultPackage.x86_64-linux = (import nixpkgs {
|
defaultPackage.x86_64-linux = (import nixpkgs {
|
||||||
system = "x86_64-linux";
|
system = "x86_64-linux";
|
||||||
overlays = [ self.overlays.default ];
|
overlays = [ self.overlays.default ];
|
||||||
}).mirror-forkos;
|
}).nixos-channel-scripts;
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,6 +0,0 @@
|
||||||
hydra_uri = "https://hydra.forkos.org"
|
|
||||||
binary_cache_uri = "https://cache.forkos.org"
|
|
||||||
nixpkgs_dir = "/var/lib/nixpkgs"
|
|
||||||
s3_release_bucket_name = "bagel-channel-scripts-test"
|
|
||||||
s3_channel_bucket_name = "bagel-channel-scripts-test"
|
|
||||||
base_git_uri_for_revision = "https://cl.forkos.org/plugins/gitiles/nixpkgs/+"
|
|
3
shell.nix
Normal file
3
shell.nix
Normal file
|
@ -0,0 +1,3 @@
|
||||||
|
(import (fetchTarball https://github.com/edolstra/flake-compat/archive/master.tar.gz) {
|
||||||
|
src = builtins.fetchGit ./.;
|
||||||
|
}).shellNix
|
612
src/actions.rs
612
src/actions.rs
|
@ -1,612 +0,0 @@
|
||||||
use std::collections::HashMap;
|
|
||||||
use std::path::PathBuf;
|
|
||||||
use std::str::FromStr;
|
|
||||||
use anyhow::Context;
|
|
||||||
use bytes::Bytes;
|
|
||||||
use digest::DynDigest;
|
|
||||||
use futures::future::join_all;
|
|
||||||
use futures::AsyncReadExt;
|
|
||||||
use sha2::Digest;
|
|
||||||
use tokio::io::{AsyncWrite, AsyncWriteExt};
|
|
||||||
use futures::Stream;
|
|
||||||
use futures::TryStreamExt;
|
|
||||||
use indicatif::MultiProgress;
|
|
||||||
use indicatif::ProgressBar;
|
|
||||||
use indicatif::ProgressDrawTarget;
|
|
||||||
use indicatif::ProgressStyle;
|
|
||||||
use tracing::debug;
|
|
||||||
use object_store::aws::AmazonS3;
|
|
||||||
use object_store::WriteMultipart;
|
|
||||||
use tempdir::TempDir;
|
|
||||||
use tracing::trace;
|
|
||||||
use crate::html::ReleaseFileData;
|
|
||||||
use crate::html::ReleasePageData;
|
|
||||||
use crate::hydra;
|
|
||||||
use crate::config;
|
|
||||||
|
|
||||||
const MULTIPART_CHUNK_SIZE: usize = 10 * 1024 * 1024; // 10 Mb
|
|
||||||
const ZSTD_COMPRESSION_LEVEL: i32 = 11; // Reasonable compression speed w.r.t. to the size of
|
|
||||||
// store-paths.
|
|
||||||
|
|
||||||
#[derive(Debug, Copy, Clone, Hash, Eq, PartialEq, clap::ValueEnum)]
|
|
||||||
pub enum Checksum {
|
|
||||||
Sha256,
|
|
||||||
Blake3
|
|
||||||
}
|
|
||||||
|
|
||||||
impl FromStr for Checksum {
|
|
||||||
type Err = anyhow::Error;
|
|
||||||
fn from_str(s: &str) -> Result<Self, Self::Err> {
|
|
||||||
Ok(match s {
|
|
||||||
"sha256" => Self::Sha256,
|
|
||||||
"blake3" => Self::Blake3,
|
|
||||||
_ => anyhow::bail!("unsupported checksum")
|
|
||||||
})
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl Checksum {
|
|
||||||
pub fn public_name(&self) -> &'static str {
|
|
||||||
match self {
|
|
||||||
Self::Sha256 => "SHA-256",
|
|
||||||
Self::Blake3 => "BLAKE-3"
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
fn into_dyn_digest(&self) -> Box<dyn DynDigest> {
|
|
||||||
match self {
|
|
||||||
Self::Sha256 => Box::new(sha2::Sha256::new()),
|
|
||||||
Self::Blake3 => Box::new(blake3::Hasher::new())
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
fn extension(&self) -> &'static str {
|
|
||||||
match self {
|
|
||||||
Self::Sha256 => "sha256",
|
|
||||||
Self::Blake3 => "b3"
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
#[derive(Debug, Copy, Clone)]
|
|
||||||
pub enum HydraProductType {
|
|
||||||
BrotliJson,
|
|
||||||
SourceDistribution,
|
|
||||||
}
|
|
||||||
|
|
||||||
#[derive(Debug)]
|
|
||||||
pub enum Compression {
|
|
||||||
Zstd,
|
|
||||||
None
|
|
||||||
}
|
|
||||||
|
|
||||||
impl Compression {
|
|
||||||
pub fn http_encoding(&self) -> Option<&str> {
|
|
||||||
Some(match self {
|
|
||||||
Self::Zstd => "zstd",
|
|
||||||
Self::None => return None,
|
|
||||||
})
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn file_extension(&self) -> Option<&str> {
|
|
||||||
Some(match self {
|
|
||||||
Self::Zstd => "zst",
|
|
||||||
Self::None => return None,
|
|
||||||
})
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl ToString for HydraProductType {
|
|
||||||
fn to_string(&self) -> String {
|
|
||||||
match self {
|
|
||||||
Self::BrotliJson => "json-br".to_string(),
|
|
||||||
Self::SourceDistribution => "source-dist".to_string(),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
#[derive(Debug)]
|
|
||||||
pub enum Action {
|
|
||||||
WriteFile {
|
|
||||||
dst_path: PathBuf,
|
|
||||||
contents: Vec<u8>
|
|
||||||
},
|
|
||||||
WriteHydraProduct {
|
|
||||||
product_name: String,
|
|
||||||
dst_path: Option<PathBuf>,
|
|
||||||
product_type: Option<HydraProductType>,
|
|
||||||
},
|
|
||||||
WriteCompressedStorePaths {
|
|
||||||
compression: Compression
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
#[derive(Debug)]
|
|
||||||
pub struct Plan {
|
|
||||||
pub actions: Vec<Action>,
|
|
||||||
pub release: hydra::Release,
|
|
||||||
pub channel: hydra::Channel,
|
|
||||||
pub target_bucket: AmazonS3,
|
|
||||||
pub revision: String
|
|
||||||
}
|
|
||||||
|
|
||||||
#[derive(Debug)]
|
|
||||||
pub struct PlanOptions {
|
|
||||||
pub streaming: bool,
|
|
||||||
pub enable_progress_bar: bool,
|
|
||||||
pub requested_checksums: Vec<Checksum>
|
|
||||||
}
|
|
||||||
|
|
||||||
macro_rules! plan_write_file {
|
|
||||||
($plan_container:ident, $dst_path:literal, $contents:expr) => {
|
|
||||||
$plan_container.push(Action::WriteFile {
|
|
||||||
dst_path: $dst_path.into(),
|
|
||||||
contents: $contents
|
|
||||||
});
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
macro_rules! plan_write_hydra_product {
|
|
||||||
($plan_container:ident, $pname:expr) => {
|
|
||||||
$plan_container.push(Action::WriteHydraProduct {
|
|
||||||
product_name: $pname.into(),
|
|
||||||
dst_path: None,
|
|
||||||
product_type: None
|
|
||||||
});
|
|
||||||
};
|
|
||||||
|
|
||||||
($plan_container:ident, $dst_path:literal, $pname:expr) => {
|
|
||||||
$plan_container.push(Action::WriteHydraProduct {
|
|
||||||
dst_path: Some($dst_path.into()),
|
|
||||||
product_name: $pname.into(),
|
|
||||||
product_type: None
|
|
||||||
});
|
|
||||||
};
|
|
||||||
|
|
||||||
($plan_container:ident, $dst_path:literal, $pname:expr, $ptype:expr) => {
|
|
||||||
$plan_container.push(Action::WriteHydraProduct {
|
|
||||||
dst_path: Some($dst_path.into()),
|
|
||||||
product_name: $pname.into(),
|
|
||||||
product_type: Some($ptype)
|
|
||||||
});
|
|
||||||
};
|
|
||||||
}
|
|
||||||
|
|
||||||
macro_rules! plan_iso {
|
|
||||||
($plan_container:ident, $iso_name:literal, $arch:literal) => {
|
|
||||||
plan_write_hydra_product!($plan_container, format!("nixos.iso_{}.{}", $iso_name, $arch));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
#[tracing::instrument]
|
|
||||||
pub async fn generate_plan(config: &config::MirrorConfig, channel: &hydra::Channel, release: &hydra::Release, evaluation: &hydra::Evaluation) -> Vec<Action> {
|
|
||||||
let mut plan = vec![];
|
|
||||||
|
|
||||||
plan.push(Action::WriteCompressedStorePaths {
|
|
||||||
compression: Compression::Zstd
|
|
||||||
});
|
|
||||||
|
|
||||||
if let hydra::Input::Git { revision, .. } = evaluation.jobset_eval_inputs.get("nixpkgs").unwrap() {
|
|
||||||
plan_write_file!(plan, "src-url", release.evaluation_url(&config.hydra_uri).into());
|
|
||||||
plan_write_file!(plan, "git-revision", revision.clone().into());
|
|
||||||
plan_write_file!(plan, "binary-cache-url", config.binary_cache_uri.clone().into());
|
|
||||||
|
|
||||||
if matches!(channel.r#type(), hydra::ChannelType::NixOS) {
|
|
||||||
plan_write_hydra_product!(plan, "nixexprs.tar.xz", "nixos.channel");
|
|
||||||
plan_write_hydra_product!(plan, "packages.json.br", "nixpkgs.tarball",
|
|
||||||
HydraProductType::BrotliJson
|
|
||||||
);
|
|
||||||
plan_write_hydra_product!(plan, "options.json.br", "nixos.options",
|
|
||||||
HydraProductType::BrotliJson
|
|
||||||
);
|
|
||||||
|
|
||||||
// TODO: we don't have aarch64-linux yet
|
|
||||||
//plan_iso!(plan, "minimal", "aarch64-linux");
|
|
||||||
plan_iso!(plan, "minimal", "x86_64-linux");
|
|
||||||
|
|
||||||
if !matches!(channel.variant(), hydra::ChannelVariant::Small) {
|
|
||||||
// TODO: for some reason, those ISOs are not present in ForkOS hydra.
|
|
||||||
// TODO: implement error recovery, skipping or failure?
|
|
||||||
// maybe encode this in the plan (fallible, etc.)
|
|
||||||
// TODO: compat with nixos-20, 21, 22, 23 using plasma5?
|
|
||||||
// should we have plasma_latest?
|
|
||||||
//plan_iso!(plan, "plasma6", "aarch64-linux");
|
|
||||||
//plan_iso!(plan, "plasma6", "x86_64-linux");
|
|
||||||
|
|
||||||
//plan_iso!(plan, "gnome", "aarch64-linux");
|
|
||||||
//plan_iso!(plan, "gnome", "x86_64-linux");
|
|
||||||
|
|
||||||
// TODO: i686-linux ISO compat? for nixos ≤ 23.11
|
|
||||||
|
|
||||||
// FIXME: weird OVA appliance here, should investigate what to do
|
|
||||||
//plan_write_hydra_product!(plan, "nixos.ova.x86_64-linux");
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
plan_write_hydra_product!(plan, "nixexprs.tar.xz", "tarball",
|
|
||||||
HydraProductType::SourceDistribution);
|
|
||||||
plan_write_hydra_product!(plan, "packages.json.br", "tarball",
|
|
||||||
HydraProductType::BrotliJson);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// TODO: take into account the SQLite database & debuginfo
|
|
||||||
|
|
||||||
plan
|
|
||||||
}
|
|
||||||
|
|
||||||
#[derive(Debug)]
|
|
||||||
pub enum PlanInterpretation {
|
|
||||||
/// The plan has been streamed to the target object storage
|
|
||||||
/// It's ready to be applied by promoting atomically this temporary object store path.
|
|
||||||
Streamed {
|
|
||||||
remote_path: object_store::path::Path,
|
|
||||||
target_path: object_store::path::Path
|
|
||||||
},
|
|
||||||
/// The plan has been prepared locally and can now be inspected or uploaded.
|
|
||||||
LocallyPlanned {
|
|
||||||
local_path: TempDir,
|
|
||||||
target_path: object_store::path::Path
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
pub async fn stream_to_multipart_writer<S, F>(mut input_stream: S, mut stream_callback: F, mut writer: WriteMultipart) -> std::io::Result<usize> where
|
|
||||||
S: Stream<Item = std::io::Result<Bytes>> + Unpin,
|
|
||||||
F: FnMut(&Bytes),
|
|
||||||
{
|
|
||||||
let mut streamed = 0;
|
|
||||||
while let Some(bytes) = input_stream.try_next().await? {
|
|
||||||
streamed += bytes.len();
|
|
||||||
stream_callback(&bytes);
|
|
||||||
writer.put(bytes);
|
|
||||||
}
|
|
||||||
|
|
||||||
writer.finish().await?;
|
|
||||||
|
|
||||||
Ok(streamed)
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
pub async fn stream_to_async_writer<S, F, W>(mut input_stream: S, _size_hint: Option<usize>, mut stream_callback: F, mut target_file: W) -> std::io::Result<usize> where
|
|
||||||
S: Stream<Item = std::io::Result<Bytes>> + Unpin,
|
|
||||||
F: FnMut(&Bytes),
|
|
||||||
W: AsyncWrite + Unpin
|
|
||||||
{
|
|
||||||
let mut streamed = 0;
|
|
||||||
while let Some(bytes) = input_stream.try_next().await? {
|
|
||||||
streamed += bytes.len();
|
|
||||||
target_file.write_all(&bytes).await?;
|
|
||||||
stream_callback(&bytes);
|
|
||||||
}
|
|
||||||
|
|
||||||
Ok(streamed)
|
|
||||||
}
|
|
||||||
|
|
||||||
pub async fn stream_to_bucket<S, F, O>(input_stream: S, size_hint: Option<usize>, target_path: &object_store::path::Path, mut stream_callback: F, bucket: &O) -> std::io::Result<usize> where
|
|
||||||
S: Stream<Item = std::io::Result<Bytes>> + Unpin,
|
|
||||||
F: FnMut(&Bytes),
|
|
||||||
O: object_store::ObjectStore
|
|
||||||
{
|
|
||||||
let uses_multipart = if let Some(size) = size_hint {
|
|
||||||
size >= MULTIPART_CHUNK_SIZE
|
|
||||||
} else { true }; // Worst case is that the stream is bigger than memory.
|
|
||||||
|
|
||||||
if uses_multipart {
|
|
||||||
// TODO: propagate error
|
|
||||||
let mpart = bucket.put_multipart(target_path).await.unwrap();
|
|
||||||
let writer = WriteMultipart::new_with_chunk_size(mpart, MULTIPART_CHUNK_SIZE);
|
|
||||||
|
|
||||||
stream_to_multipart_writer(input_stream, stream_callback, writer).await
|
|
||||||
} else {
|
|
||||||
let mut buffer = if let Some(size) = size_hint {
|
|
||||||
Vec::with_capacity(size)
|
|
||||||
} else {
|
|
||||||
// Constantify
|
|
||||||
Vec::with_capacity(8192)
|
|
||||||
};
|
|
||||||
|
|
||||||
// TODO: find a way to propagate the progress bar here.
|
|
||||||
// call the stream callback otherwise we cannot hash entire files.
|
|
||||||
input_stream.into_async_read().read_to_end(&mut buffer).await?;
|
|
||||||
let actual_length = buffer.len();
|
|
||||||
// TODO: generalize the stream callback argument
|
|
||||||
// it's not that bad because it's usually very small here.
|
|
||||||
stream_callback(&Bytes::copy_from_slice(&buffer));
|
|
||||||
|
|
||||||
bucket.put(target_path, buffer.into()).await?;
|
|
||||||
|
|
||||||
Ok(actual_length)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
pub async fn stream_to_target_nock<S, F, O>(input_stream: S,
|
|
||||||
size_hint: Option<usize>,
|
|
||||||
key_prefix: &str, key: &str,
|
|
||||||
stream_callback: F,
|
|
||||||
bucket: &O, streaming: bool) -> std::io::Result<usize> where
|
|
||||||
S: Stream<Item = std::io::Result<Bytes>> + Unpin,
|
|
||||||
F: FnMut(&Bytes),
|
|
||||||
O: object_store::ObjectStore {
|
|
||||||
if streaming {
|
|
||||||
let bucket_path: object_store::path::Path = format!("{}/{}", key_prefix, key).into();
|
|
||||||
stream_to_bucket(input_stream, size_hint, &bucket_path, stream_callback, bucket).await
|
|
||||||
} else {
|
|
||||||
let fs_path: PathBuf = PathBuf::from(key_prefix).join(key);
|
|
||||||
let tgt_file = tokio::fs::File::create(&fs_path).await?;
|
|
||||||
stream_to_async_writer(input_stream, size_hint, stream_callback, tgt_file).await
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
pub async fn stream_to_target<S, F, O>(input_stream: S,
|
|
||||||
size_hint: Option<usize>,
|
|
||||||
key_prefix: &str, key: &str,
|
|
||||||
stream_callback: F,
|
|
||||||
bucket: &O, streaming: bool,
|
|
||||||
requested_checksums: Vec<Checksum>) -> std::io::Result<(usize, HashMap<Checksum, Box<[u8]>>)> where
|
|
||||||
S: Stream<Item = std::io::Result<Bytes>> + Unpin,
|
|
||||||
F: Fn(&Bytes),
|
|
||||||
O: object_store::ObjectStore {
|
|
||||||
let mut hashers: HashMap<Checksum, Box<dyn DynDigest>> = HashMap::from_iter(
|
|
||||||
requested_checksums.into_iter().map(|checksum| (checksum, checksum.into_dyn_digest()))
|
|
||||||
);
|
|
||||||
|
|
||||||
let copied = stream_to_target_nock(input_stream, size_hint, key_prefix, key, |bytes| {
|
|
||||||
hashers.iter_mut().for_each(|(_, hasher)| {
|
|
||||||
hasher.update(&bytes);
|
|
||||||
});
|
|
||||||
|
|
||||||
stream_callback(&bytes);
|
|
||||||
}, bucket, streaming).await?;
|
|
||||||
|
|
||||||
// Stream all the checksums as well.
|
|
||||||
let keys: HashMap<Checksum, String> = hashers.iter().map(|(ck, _)| (ck.clone(), format!("{}.{}", key, ck.extension()))).collect();
|
|
||||||
let checksums: HashMap<Checksum, Box<[u8]>> = hashers.into_iter().map(|(ck, hasher)| (ck, hasher.finalize())).collect();
|
|
||||||
|
|
||||||
join_all(checksums.iter().map(|(ck, digest)| {
|
|
||||||
let key = keys.get(ck).unwrap();
|
|
||||||
let hexdigest = hex::encode(digest);
|
|
||||||
let ck_stream = tokio_stream::once(
|
|
||||||
Ok(Bytes::from_iter(hexdigest.into_bytes().into_iter()))
|
|
||||||
);
|
|
||||||
|
|
||||||
stream_to_target_nock(ck_stream,
|
|
||||||
Some(digest.as_ref().len()),
|
|
||||||
key_prefix,
|
|
||||||
key,
|
|
||||||
|bytes| {
|
|
||||||
trace!("streaming the checksum ({:?}); chunk size = {}", ck.clone(), bytes.len());
|
|
||||||
}, bucket, streaming)
|
|
||||||
})).await;
|
|
||||||
|
|
||||||
Ok((copied, checksums))
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Interprets the plan as a list of action.
|
|
||||||
///
|
|
||||||
/// It creates a staging directory where all actions are prepared.
|
|
||||||
/// No irreversible action is committed.
|
|
||||||
///
|
|
||||||
/// At the end, it returns the staging directory for further post-processing, including possible
|
|
||||||
/// uploads.
|
|
||||||
///
|
|
||||||
/// Enabling streaming will also directly stream the contents of the staging directory to the
|
|
||||||
/// target bucket in a temporary suffix of the target prefix.
|
|
||||||
/// Caller is responsible to promote that upload to a finalized upload.
|
|
||||||
#[tracing::instrument]
|
|
||||||
pub async fn interpret_plan(hydra_client: &hydra::HydraClient<'_>, plan: Plan, plan_opts: PlanOptions) -> PlanInterpretation {
|
|
||||||
// Prepare a staging directory
|
|
||||||
let staging_directory = TempDir::new(format!("staging-{}", &plan.channel.name).as_str()).expect("Failed to create temporary directory for staging directory");
|
|
||||||
let staging_prefix = staging_directory.path().file_name()
|
|
||||||
.expect("Failed to obtain the final component of the staging directory")
|
|
||||||
.to_str()
|
|
||||||
.expect("Failed to convert the final component into string")
|
|
||||||
.to_owned();
|
|
||||||
let streaming = plan_opts.streaming;
|
|
||||||
|
|
||||||
let key_prefix = if streaming {
|
|
||||||
staging_prefix.to_string()
|
|
||||||
} else {
|
|
||||||
staging_directory.path().to_str().expect("Staging directory path is not valid UTF-8").to_owned()
|
|
||||||
};
|
|
||||||
|
|
||||||
if streaming {
|
|
||||||
debug!("Streaming the update to /{}", staging_prefix);
|
|
||||||
|
|
||||||
} else {
|
|
||||||
debug!("Preparing locally the update in {}", staging_directory.path().display());
|
|
||||||
}
|
|
||||||
|
|
||||||
// Initialize multiple progress bars if we are to show progress bars.
|
|
||||||
let multi_pbar = MultiProgress::with_draw_target(if plan_opts.enable_progress_bar {
|
|
||||||
ProgressDrawTarget::stdout()
|
|
||||||
} else {
|
|
||||||
ProgressDrawTarget::hidden()
|
|
||||||
});
|
|
||||||
|
|
||||||
// Pending download/upload/streaming tasks.
|
|
||||||
let mut pending = Vec::new();
|
|
||||||
|
|
||||||
for action in plan.actions {
|
|
||||||
let task = async {
|
|
||||||
let pbar = ProgressBar::new(0);
|
|
||||||
multi_pbar.add(pbar.clone());
|
|
||||||
pbar.set_style(ProgressStyle::with_template("[{elapsed_precise}] {bar:40.cyan/blue} {bytes:>7}/{total_bytes:>7} @ {bytes_per_sec:>7} {msg}").unwrap().progress_chars("##-"));
|
|
||||||
let requested_checksums = plan_opts.requested_checksums.clone();
|
|
||||||
|
|
||||||
match action {
|
|
||||||
Action::WriteHydraProduct { product_name, dst_path, product_type } => {
|
|
||||||
let dst_path = dst_path.unwrap_or_else(|| product_name.clone().into());
|
|
||||||
assert!(dst_path.is_relative(), "Unexpected absolute path in a channel script plan, all paths needs to be relative in order for them to be prefixed by the staging directory");
|
|
||||||
|
|
||||||
let key = dst_path.to_str().expect("Failed to convert relative destination path to a UTF-8 key");
|
|
||||||
let (size, stream) = hydra_client.hydra_product_bytes_stream(&plan.release, &product_name, product_type).await
|
|
||||||
.context("while preparing the stream for a Hydra product")
|
|
||||||
.expect("Failed to stream an Hydra product");
|
|
||||||
|
|
||||||
pbar.set_length(size);
|
|
||||||
if streaming {
|
|
||||||
pbar.set_message(format!("Downloading and streaming '{}'", product_name));
|
|
||||||
} else {
|
|
||||||
pbar.set_message(format!("Copying '{}'", product_name));
|
|
||||||
}
|
|
||||||
|
|
||||||
let (copied, checksums) = stream_to_target(stream, Some(size as usize), &key_prefix, key, |bytes| {
|
|
||||||
// This is not really called when the multipart
|
|
||||||
// has completed pushing this part.
|
|
||||||
// In practice, if we are pushing as soon as we read,
|
|
||||||
// we expect the bytes chunk to be small
|
|
||||||
// and therefore, this is fairly reasonable.
|
|
||||||
trace!("multipart streaming: chunk size = {}", bytes.len());
|
|
||||||
pbar.inc(bytes.len() as u64);
|
|
||||||
}, &plan.target_bucket, streaming, requested_checksums).await
|
|
||||||
.context(
|
|
||||||
format!("while streaming to the bucket the product {}", product_name))
|
|
||||||
.unwrap();
|
|
||||||
|
|
||||||
pbar.finish_with_message(format!("Streamed '{}'", product_name));
|
|
||||||
|
|
||||||
debug!("Copied {} bytes", copied);
|
|
||||||
|
|
||||||
return ReleaseFileData::new(key, size, checksums);
|
|
||||||
},
|
|
||||||
Action::WriteFile { dst_path, contents } => {
|
|
||||||
// We don't need a progress bar here.
|
|
||||||
assert!(dst_path.is_relative(), "Unexpected absolute path in a channel script plan, all paths needs to be relative in order for them to be prefixed by the staging directory");
|
|
||||||
let key = dst_path.to_str().expect("Failed to convert a relative path to a UTF-8 key");
|
|
||||||
let size = contents.len();
|
|
||||||
let stream = tokio_stream::once(Ok(Bytes::from_iter(contents.into_iter())));
|
|
||||||
pbar.set_length(size as u64);
|
|
||||||
if streaming {
|
|
||||||
pbar.set_message(format!("Streaming '{}'", key));
|
|
||||||
} else {
|
|
||||||
pbar.set_message(format!("Writing '{}'", key));
|
|
||||||
}
|
|
||||||
let (copied, checksums) = stream_to_target(stream, Some(size), &key_prefix, key, |bytes| {
|
|
||||||
// This is not really called when the multipart
|
|
||||||
// has completed pushing this part.
|
|
||||||
// In practice, if we are pushing as soon as we read,
|
|
||||||
// we expect the bytes chunk to be small
|
|
||||||
// and therefore, this is fairly reasonable.
|
|
||||||
trace!("multipart streaming: chunk size = {}", bytes.len());
|
|
||||||
pbar.inc(bytes.len() as u64);
|
|
||||||
}, &plan.target_bucket, streaming, requested_checksums).await
|
|
||||||
.context(
|
|
||||||
format!("while streaming to the bucket the following key '{}'", key)
|
|
||||||
).unwrap();
|
|
||||||
|
|
||||||
pbar.finish_with_message(format!("Written '{}'", key));
|
|
||||||
|
|
||||||
debug!("Written {} bytes", copied);
|
|
||||||
|
|
||||||
return ReleaseFileData::new(key, size as u64, checksums);
|
|
||||||
},
|
|
||||||
Action::WriteCompressedStorePaths { compression } => {
|
|
||||||
let spaths = hydra_client.fetch_store_paths(&plan.release).await.context("while fetching store paths").expect("Failed to fetch store paths from Hydra");
|
|
||||||
let mut serialized = serde_json::to_vec(&spaths).context("while serializing store paths to JSON").expect("failed to serialize");
|
|
||||||
|
|
||||||
let key;
|
|
||||||
|
|
||||||
if matches!(compression, Compression::Zstd) {
|
|
||||||
let compressed_serialization = zstd::bulk::compress(&serialized, ZSTD_COMPRESSION_LEVEL).expect("Failed to compress the store-paths");
|
|
||||||
let _ = std::mem::replace(&mut serialized, compressed_serialization);
|
|
||||||
key = "store-paths.zst";
|
|
||||||
} else {
|
|
||||||
key = "store-paths";
|
|
||||||
}
|
|
||||||
|
|
||||||
let size = serialized.len();
|
|
||||||
let stream = tokio_stream::once(Ok(Bytes::from_iter(serialized.into_iter())));
|
|
||||||
|
|
||||||
pbar.set_length(size as u64);
|
|
||||||
if streaming {
|
|
||||||
pbar.set_message("Streaming compressed store paths");
|
|
||||||
} else {
|
|
||||||
pbar.set_message("Writing compressed store paths");
|
|
||||||
}
|
|
||||||
|
|
||||||
let (copied, checksums) = stream_to_target(stream, Some(size), &key_prefix, key, |bytes| {
|
|
||||||
// This is not really called when the multipart
|
|
||||||
// has completed pushing this part.
|
|
||||||
// In practice, if we are pushing as soon as we read,
|
|
||||||
// we expect the bytes chunk to be small
|
|
||||||
// and therefore, this is fairly reasonable.
|
|
||||||
trace!("multipart streaming: chunk size = {}", bytes.len());
|
|
||||||
pbar.inc(bytes.len() as u64);
|
|
||||||
}, &plan.target_bucket, streaming, requested_checksums).await
|
|
||||||
.context(
|
|
||||||
format!("while streaming to the bucket the following key '{}'", key)
|
|
||||||
).unwrap();
|
|
||||||
|
|
||||||
pbar.finish_with_message("Compressed store paths streamed");
|
|
||||||
debug!("Written {} bytes", copied);
|
|
||||||
|
|
||||||
return ReleaseFileData::new(key, size as u64, checksums);
|
|
||||||
},
|
|
||||||
};
|
|
||||||
};
|
|
||||||
|
|
||||||
pending.push(task);
|
|
||||||
}
|
|
||||||
|
|
||||||
let files: Vec<ReleaseFileData> = join_all(pending).await;
|
|
||||||
let target_path = plan.release.prefix(&plan.channel);
|
|
||||||
let rendered_page = crate::html::render_release_page(ReleasePageData {
|
|
||||||
channel_name: plan.channel.name.clone(),
|
|
||||||
release_name: plan.release.nix_name.clone(),
|
|
||||||
release_date: chrono::Utc::now().format("%Y-%m-%d %H-%M UTC").to_string(),
|
|
||||||
git_commit_link: format!("{}/{}", hydra_client.config.base_git_uri_for_revision, plan.revision),
|
|
||||||
git_revision: plan.revision,
|
|
||||||
// TODO: mega evil, clean up and expose config properly.
|
|
||||||
hydra_eval_link: plan.release.evaluation_url(&hydra_client.config.hydra_uri),
|
|
||||||
hydra_eval_id: plan.release.evaluation_id(),
|
|
||||||
checksums: plan_opts.requested_checksums.iter().map(|ck| ck.public_name().to_string()).collect(),
|
|
||||||
files
|
|
||||||
}).expect("Failed to render a release page");
|
|
||||||
|
|
||||||
|
|
||||||
let pbar = ProgressBar::new(0);
|
|
||||||
multi_pbar.add(pbar.clone());
|
|
||||||
pbar.set_style(ProgressStyle::with_template("[{elapsed_precise}] {bar:40.cyan/blue} {bytes:>7}/{total_bytes:>7} @ {bytes_per_sec:>7} {msg}").unwrap().progress_chars("##-"));
|
|
||||||
let key = if streaming {
|
|
||||||
plan.release.nix_name
|
|
||||||
} else {
|
|
||||||
format!("{}.html", plan.release.nix_name)
|
|
||||||
};
|
|
||||||
let contents = rendered_page.into_bytes();
|
|
||||||
let size = contents.len();
|
|
||||||
let stream = tokio_stream::once(Ok(Bytes::from_iter(contents.into_iter())));
|
|
||||||
pbar.set_length(size as u64);
|
|
||||||
if streaming {
|
|
||||||
pbar.set_message("Streaming the release HTML page");
|
|
||||||
} else {
|
|
||||||
pbar.set_message("Writing the release HTML page");
|
|
||||||
}
|
|
||||||
let (copied, _) = stream_to_target(stream, Some(size), &key_prefix, &key, |bytes| {
|
|
||||||
// This is not really called when the multipart
|
|
||||||
// has completed pushing this part.
|
|
||||||
// In practice, if we are pushing as soon as we read,
|
|
||||||
// we expect the bytes chunk to be small
|
|
||||||
// and therefore, this is fairly reasonable.
|
|
||||||
trace!("multipart streaming: chunk size = {}", bytes.len());
|
|
||||||
pbar.inc(bytes.len() as u64);
|
|
||||||
}, &plan.target_bucket, streaming, Vec::new()).await
|
|
||||||
.context(
|
|
||||||
format!("while streaming to the bucket the following key '{}'", key)
|
|
||||||
).unwrap();
|
|
||||||
|
|
||||||
debug!("Release HTML page sent; copied {} bytes", copied);
|
|
||||||
pbar.finish_with_message("Release HTML page streamed");
|
|
||||||
|
|
||||||
if streaming {
|
|
||||||
staging_directory.close().expect("Failed to get rid of the staging directory");
|
|
||||||
PlanInterpretation::Streamed {
|
|
||||||
remote_path: staging_prefix.into(),
|
|
||||||
target_path
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
PlanInterpretation::LocallyPlanned {
|
|
||||||
local_path: staging_directory,
|
|
||||||
target_path
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
|
@ -1,36 +0,0 @@
|
||||||
use object_store::aws::{AmazonS3, AmazonS3Builder};
|
|
||||||
use serde::Deserialize;
|
|
||||||
use std::path::PathBuf;
|
|
||||||
|
|
||||||
#[derive(Debug, Deserialize)]
|
|
||||||
pub struct MirrorConfig {
|
|
||||||
/// URI to Hydra instance
|
|
||||||
pub hydra_uri: String,
|
|
||||||
/// URI to the binary cache
|
|
||||||
pub binary_cache_uri: String,
|
|
||||||
/// Base URI to access to a web interface pertaining to a specific Git revision of the nixpkgs
|
|
||||||
/// input
|
|
||||||
pub base_git_uri_for_revision: String,
|
|
||||||
/// A path to a checkout of nixpkgs
|
|
||||||
nixpkgs_dir: PathBuf,
|
|
||||||
/// S3 releases bucket name
|
|
||||||
pub s3_release_bucket_name: String,
|
|
||||||
/// S3 channels bucket name
|
|
||||||
s3_channel_bucket_name: String,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl MirrorConfig {
|
|
||||||
pub fn release_bucket(&self) -> AmazonS3 {
|
|
||||||
AmazonS3Builder::from_env()
|
|
||||||
.with_bucket_name(&self.s3_release_bucket_name)
|
|
||||||
.build()
|
|
||||||
.expect("Failed to connect to the S3 release bucket")
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn channel_bucket(&self) -> AmazonS3 {
|
|
||||||
AmazonS3Builder::from_env()
|
|
||||||
.with_bucket_name(&self.s3_channel_bucket_name)
|
|
||||||
.build()
|
|
||||||
.expect("Failed to connect to the S3 channel bucket")
|
|
||||||
}
|
|
||||||
}
|
|
|
@ -1 +0,0 @@
|
||||||
|
|
49
src/html.rs
49
src/html.rs
|
@ -1,49 +0,0 @@
|
||||||
use std::collections::HashMap;
|
|
||||||
|
|
||||||
use handlebars::Handlebars;
|
|
||||||
use indicatif::HumanBytes;
|
|
||||||
use serde::Serialize;
|
|
||||||
|
|
||||||
use crate::actions::Checksum;
|
|
||||||
|
|
||||||
|
|
||||||
#[derive(Debug, Serialize)]
|
|
||||||
pub struct ReleaseFileData {
|
|
||||||
pub name: String,
|
|
||||||
pub size: String,
|
|
||||||
pub checksums: HashMap<String, String>
|
|
||||||
}
|
|
||||||
|
|
||||||
impl ReleaseFileData {
|
|
||||||
pub fn new(name: &str, size: u64, checksums: HashMap<Checksum, Box<[u8]>>) -> Self {
|
|
||||||
ReleaseFileData {
|
|
||||||
name: name.to_string(),
|
|
||||||
size: HumanBytes(size).to_string(),
|
|
||||||
checksums: checksums.into_iter().map(|(ck, digest)| {
|
|
||||||
(ck.public_name().to_string(),
|
|
||||||
hex::encode(digest))
|
|
||||||
}).collect()
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
#[derive(Debug, Serialize)]
|
|
||||||
#[serde(rename_all = "camelCase")]
|
|
||||||
pub struct ReleasePageData {
|
|
||||||
pub channel_name: String,
|
|
||||||
pub release_name: String,
|
|
||||||
pub release_date: String,
|
|
||||||
pub git_commit_link: String,
|
|
||||||
pub git_revision: String,
|
|
||||||
pub hydra_eval_link: String,
|
|
||||||
pub hydra_eval_id: u64,
|
|
||||||
pub files: Vec<ReleaseFileData>,
|
|
||||||
pub checksums: Vec<String>
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn render_release_page(page_data: ReleasePageData) -> Result<String, handlebars::RenderError> {
|
|
||||||
let reg = Handlebars::new();
|
|
||||||
|
|
||||||
let release_template = include_str!("./release.tpl");
|
|
||||||
reg.render_template(&release_template, &page_data)
|
|
||||||
}
|
|
338
src/hydra.rs
338
src/hydra.rs
|
@ -1,338 +0,0 @@
|
||||||
use std::{collections::HashMap, path::PathBuf};
|
|
||||||
use anyhow::Context;
|
|
||||||
use bytes::Bytes;
|
|
||||||
use futures::Stream;
|
|
||||||
use indicatif::ProgressBar;
|
|
||||||
use nix_compat::store_path::{StorePath, StorePathRef};
|
|
||||||
use regex::Regex;
|
|
||||||
|
|
||||||
use serde::{de::Error, Deserialize, Deserializer};
|
|
||||||
use tokio::io::AsyncWrite;
|
|
||||||
|
|
||||||
use crate::{actions::HydraProductType, config::MirrorConfig};
|
|
||||||
|
|
||||||
pub type ReleaseId = u64;
|
|
||||||
pub type EvaluationId = u64;
|
|
||||||
pub type BuildId = u64;
|
|
||||||
|
|
||||||
fn deser_bool_as_int<'de, D>(deserializer: D) -> Result<bool, D::Error>
|
|
||||||
where D: Deserializer<'de>
|
|
||||||
{
|
|
||||||
u8::deserialize(deserializer).map(|b| if b == 1 { true } else { false })
|
|
||||||
}
|
|
||||||
|
|
||||||
fn deser_bool_as_string<'de, D>(deserializer: D) -> Result<bool, D::Error>
|
|
||||||
where D: Deserializer<'de>
|
|
||||||
{
|
|
||||||
let s: &str = Deserialize::deserialize(deserializer)?;
|
|
||||||
|
|
||||||
match s {
|
|
||||||
"false" => Ok(false),
|
|
||||||
"true" => Ok(true),
|
|
||||||
_ => Err(Error::unknown_variant(s, &["false", "true"]))
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
#[derive(Debug, Deserialize)]
|
|
||||||
pub struct OutputPath {
|
|
||||||
#[serde(rename="path")]
|
|
||||||
store_path: String
|
|
||||||
}
|
|
||||||
|
|
||||||
#[derive(Debug, Deserialize)]
|
|
||||||
pub struct BuildProduct {
|
|
||||||
#[serde(rename="path")]
|
|
||||||
store_path: String,
|
|
||||||
name: String,
|
|
||||||
subtype: String,
|
|
||||||
sha256hash: String,
|
|
||||||
r#type: String,
|
|
||||||
#[serde(rename="filesize")]
|
|
||||||
file_size: u64
|
|
||||||
}
|
|
||||||
|
|
||||||
#[derive(Debug, Deserialize)]
|
|
||||||
pub struct BuildInfo {
|
|
||||||
jobset: String,
|
|
||||||
job: String,
|
|
||||||
project: String,
|
|
||||||
#[serde(deserialize_with = "deser_bool_as_int", rename="buildstatus")]
|
|
||||||
finished: bool,
|
|
||||||
#[serde(rename="releasename")]
|
|
||||||
release_name: Option<String>,
|
|
||||||
#[serde(rename="stoptime")]
|
|
||||||
stop_time: u64,
|
|
||||||
#[serde(rename="starttime")]
|
|
||||||
start_time: u64,
|
|
||||||
system: String,
|
|
||||||
id: BuildId,
|
|
||||||
#[serde(rename="buildoutputs")]
|
|
||||||
build_outputs: HashMap<String, OutputPath>,
|
|
||||||
#[serde(rename="buildproducts")]
|
|
||||||
build_products: HashMap<String, BuildProduct>,
|
|
||||||
#[serde(rename="nixname")]
|
|
||||||
nix_name: String,
|
|
||||||
#[serde(rename="drvpath")]
|
|
||||||
drv_path: String,
|
|
||||||
}
|
|
||||||
|
|
||||||
#[derive(Debug, Clone)]
|
|
||||||
pub struct Channel {
|
|
||||||
pub name: String
|
|
||||||
}
|
|
||||||
|
|
||||||
#[non_exhaustive]
|
|
||||||
pub enum ChannelType {
|
|
||||||
NixOS,
|
|
||||||
Nixpkgs,
|
|
||||||
}
|
|
||||||
|
|
||||||
#[non_exhaustive]
|
|
||||||
pub enum ChannelVariant {
|
|
||||||
Normal,
|
|
||||||
Small
|
|
||||||
}
|
|
||||||
|
|
||||||
impl Channel {
|
|
||||||
pub fn version(&self) -> String {
|
|
||||||
let re = Regex::new("([a-z]+)-(?<ver>.*)").unwrap();
|
|
||||||
let caps = re.captures(&self.name).expect("Failed to parse the channel name");
|
|
||||||
|
|
||||||
caps["ver"].to_string()
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn prefix(&self) -> String {
|
|
||||||
let re = Regex::new("(?<name>[a-z]+)-(.*)").unwrap();
|
|
||||||
let caps = re.captures(&self.name).expect("Failed to parse the channel name");
|
|
||||||
|
|
||||||
caps["name"].to_string()
|
|
||||||
}
|
|
||||||
|
|
||||||
// TODO: just regex match?
|
|
||||||
pub fn r#type(&self) -> ChannelType {
|
|
||||||
ChannelType::NixOS
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn variant(&self) -> ChannelVariant {
|
|
||||||
ChannelVariant::Normal
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
#[derive(Deserialize, Debug)]
|
|
||||||
pub struct Release {
|
|
||||||
pub id: ReleaseId,
|
|
||||||
job: String,
|
|
||||||
#[serde(rename = "releasename")]
|
|
||||||
release_name: Option<String>,
|
|
||||||
#[serde(rename = "starttime")]
|
|
||||||
start_time: u64,
|
|
||||||
#[serde(rename = "stoptime")]
|
|
||||||
stop_time: u64,
|
|
||||||
#[serde(rename = "nixname")]
|
|
||||||
pub nix_name: String,
|
|
||||||
#[serde(rename = "jobsetevals")]
|
|
||||||
jobset_evals: Vec<EvaluationId>,
|
|
||||||
jobset: String,
|
|
||||||
#[serde(deserialize_with = "deser_bool_as_int")]
|
|
||||||
finished: bool,
|
|
||||||
priority: u64,
|
|
||||||
system: String,
|
|
||||||
timestamp: u64,
|
|
||||||
project: String,
|
|
||||||
#[serde(rename = "drvpath")]
|
|
||||||
derivation_path: String,
|
|
||||||
// ignored: buildproducts, buildoutputs, buildmetrics, buildstatus
|
|
||||||
}
|
|
||||||
|
|
||||||
#[derive(Debug)]
|
|
||||||
pub struct GitInput {
|
|
||||||
uri: String,
|
|
||||||
revision: String
|
|
||||||
}
|
|
||||||
|
|
||||||
// FIXME(Raito): for some reason, #[serde(tag = "type"), rename_all = "lowercase"] doesn't behave
|
|
||||||
// correctly, and causes deserialization failures in practice. `untagged` is suboptimal but works
|
|
||||||
// because of the way responses works...
|
|
||||||
#[derive(Debug, Deserialize)]
|
|
||||||
#[serde(untagged, expecting = "An valid jobset input")]
|
|
||||||
pub enum Input {
|
|
||||||
Boolean {
|
|
||||||
#[serde(deserialize_with = "deser_bool_as_string")]
|
|
||||||
value: bool
|
|
||||||
},
|
|
||||||
Git { uri: String, revision: String },
|
|
||||||
Nix { value: String },
|
|
||||||
}
|
|
||||||
|
|
||||||
#[derive(Deserialize, Debug)]
|
|
||||||
pub struct Evaluation {
|
|
||||||
pub id: EvaluationId,
|
|
||||||
#[serde(rename="checkouttime")]
|
|
||||||
checkout_time: u64,
|
|
||||||
#[serde(rename="evaltime")]
|
|
||||||
eval_time: u64,
|
|
||||||
flake: Option<String>,
|
|
||||||
#[serde(rename="jobsetevalinputs", default)]
|
|
||||||
pub jobset_eval_inputs: HashMap<String, Input>,
|
|
||||||
timestamp: u64,
|
|
||||||
builds: Vec<BuildId>,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl Release {
|
|
||||||
pub fn version(&self) -> String {
|
|
||||||
let re = Regex::new(".+-(?<ver>[0-9].+)").unwrap();
|
|
||||||
let caps = re.captures(&self.nix_name).expect("Failed to parse the release name");
|
|
||||||
|
|
||||||
caps["ver"].to_string()
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn evaluation_id(&self) -> u64 {
|
|
||||||
*self.jobset_evals.first().expect("Failed to obtain the corresponding evaluation, malformed release?")
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn evaluation_url(&self, hydra_base_uri: &str) -> String {
|
|
||||||
let eval_id = self.evaluation_id();
|
|
||||||
format!("{}/eval/{}", hydra_base_uri, eval_id)
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn job_url(&self, hydra_base_uri: &str, job_name: &str) -> String {
|
|
||||||
let eval_id = self.evaluation_id();
|
|
||||||
format!("{}/eval/{}/job/{}", hydra_base_uri, eval_id, job_name)
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn store_paths_url(&self, hydra_base_uri: &str) -> String {
|
|
||||||
let eval_id = self.evaluation_id();
|
|
||||||
format!("{}/eval/{}/store-paths", hydra_base_uri, eval_id)
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Directory related to this release.
|
|
||||||
fn directory(&self, channel: &Channel) -> String {
|
|
||||||
match channel.name.as_str() {
|
|
||||||
"nixpkgs-unstable" => "nixpkgs".to_string(),
|
|
||||||
_ => format!("{}/{}", channel.prefix(), channel.version())
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn prefix(&self, channel: &Channel) -> object_store::path::Path {
|
|
||||||
format!("{}/{}", self.directory(channel), self.nix_name).into()
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn release_uri(hydra_uri: &str, job_name: &str) -> String {
|
|
||||||
format!("{}/job/{}/latest", hydra_uri, job_name)
|
|
||||||
}
|
|
||||||
|
|
||||||
#[derive(Debug)]
|
|
||||||
pub struct HydraClient<'a> {
|
|
||||||
pub config: &'a MirrorConfig,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl HydraClient<'_> {
|
|
||||||
pub async fn fetch_release(&self, job_name: &str) -> reqwest::Result<Release> {
|
|
||||||
let client = reqwest::Client::new();
|
|
||||||
let resp = client.get(release_uri(&self.config.hydra_uri, job_name))
|
|
||||||
.header("Accept", "application/json")
|
|
||||||
// TODO: put a proper version
|
|
||||||
.header("User-Agent", "nixos-channel-scripts (rust)")
|
|
||||||
.send()
|
|
||||||
.await?;
|
|
||||||
|
|
||||||
resp.json().await
|
|
||||||
}
|
|
||||||
|
|
||||||
pub async fn fetch_evaluation(&self, release: &Release) -> reqwest::Result<Evaluation> {
|
|
||||||
let client = reqwest::Client::new();
|
|
||||||
let resp = client.get(release.evaluation_url(&self.config.hydra_uri))
|
|
||||||
.header("Accept", "application/json")
|
|
||||||
.header("User-Agent", "nixos-channel-scripts (rust)")
|
|
||||||
.send()
|
|
||||||
.await?;
|
|
||||||
|
|
||||||
resp.json().await
|
|
||||||
}
|
|
||||||
|
|
||||||
pub async fn fetch_store_paths(&self, release: &Release) -> anyhow::Result<Vec<StorePath<String>>> {
|
|
||||||
let client = reqwest::Client::new();
|
|
||||||
|
|
||||||
Ok(client.get(release.store_paths_url(&self.config.hydra_uri))
|
|
||||||
.header("Accept", "application/json")
|
|
||||||
.header("User-Agent", "nixos-channel-scripts (rust)")
|
|
||||||
.send()
|
|
||||||
.await?
|
|
||||||
.json()
|
|
||||||
.await
|
|
||||||
.context(format!("while downloading store-paths information for an evaluation"))?
|
|
||||||
)
|
|
||||||
}
|
|
||||||
|
|
||||||
pub async fn hydra_product_bytes_stream(&self, release: &Release, job_name: &str, product_type: Option<HydraProductType>) -> anyhow::Result<(u64, impl Stream<Item = std::io::Result<Bytes>>)> {
|
|
||||||
let client = reqwest::Client::new();
|
|
||||||
let build_info: BuildInfo = client.get(release.job_url(&self.config.hydra_uri, job_name))
|
|
||||||
.header("Accept", "application/json")
|
|
||||||
.header("User-Agent", "nixos-channel-scripts (rust)")
|
|
||||||
.send()
|
|
||||||
.await?
|
|
||||||
.json()
|
|
||||||
.await
|
|
||||||
.context(format!("while downloading build information from {}", release.job_url(&self.config.hydra_uri, job_name)))?;
|
|
||||||
|
|
||||||
let mut products_by_subtype: HashMap<String, BuildProduct> = HashMap::new();
|
|
||||||
for (_, product) in build_info.build_products {
|
|
||||||
if products_by_subtype.contains_key(&product.subtype) {
|
|
||||||
todo!("Job {} has multiple products of the same subtype {:?}. This is not supported yet.", job_name, product_type);
|
|
||||||
}
|
|
||||||
|
|
||||||
products_by_subtype.insert(product.subtype.clone(), product);
|
|
||||||
}
|
|
||||||
|
|
||||||
if products_by_subtype.len() > 1 && product_type.is_none() {
|
|
||||||
panic!("Job {} has {} build products. Select the right product via the subtypes.", job_name, products_by_subtype.len());
|
|
||||||
}
|
|
||||||
|
|
||||||
let product: BuildProduct = if let Some(ptype) = product_type {
|
|
||||||
products_by_subtype.remove(&ptype.to_string()).expect(&format!("Expected product type '{}' but not found in the list of products", &ptype.to_string()))
|
|
||||||
} else {
|
|
||||||
products_by_subtype.into_iter().last().expect(&format!("Expected at least one build product in job {}, found zero", job_name)).1
|
|
||||||
};
|
|
||||||
|
|
||||||
// 3. FIXME: validate sha256hash during read?
|
|
||||||
let (store_path, rel_path) = StorePathRef::from_absolute_path_full(&product.store_path).expect("Failed to parse the product's store path");
|
|
||||||
crate::nar::file_in_nar_bytes_stream(&self.config.binary_cache_uri, &nix_compat::nixbase32::encode(store_path.digest()), rel_path.to_str().unwrap()).await.context("while copying the NAR to the target")
|
|
||||||
}
|
|
||||||
|
|
||||||
pub async fn copy_hydra_product<W: AsyncWrite + Unpin>(&self, release: &Release, job_name: &str, product_type: Option<HydraProductType>, pbar: ProgressBar, out: &mut W) -> anyhow::Result<u64> {
|
|
||||||
// TODO: dry me?
|
|
||||||
let client = reqwest::Client::new();
|
|
||||||
let build_info: BuildInfo = client.get(release.job_url(&self.config.hydra_uri, job_name))
|
|
||||||
.header("Accept", "application/json")
|
|
||||||
.header("User-Agent", "nixos-channel-scripts (rust)")
|
|
||||||
.send()
|
|
||||||
.await?
|
|
||||||
.json()
|
|
||||||
.await
|
|
||||||
.context(format!("while downloading build information from {}", release.job_url(&self.config.hydra_uri, job_name)))?;
|
|
||||||
|
|
||||||
let mut products_by_subtype: HashMap<String, BuildProduct> = HashMap::new();
|
|
||||||
for (_, product) in build_info.build_products {
|
|
||||||
if products_by_subtype.contains_key(&product.subtype) {
|
|
||||||
todo!("Job {} has multiple products of the same subtype {:?}. This is not supported yet.", job_name, product_type);
|
|
||||||
}
|
|
||||||
|
|
||||||
products_by_subtype.insert(product.subtype.clone(), product);
|
|
||||||
}
|
|
||||||
|
|
||||||
if products_by_subtype.len() > 1 && product_type.is_none() {
|
|
||||||
panic!("Job {} has {} build products. Select the right product via the subtypes.", job_name, products_by_subtype.len());
|
|
||||||
}
|
|
||||||
|
|
||||||
let product: BuildProduct = if let Some(ptype) = product_type {
|
|
||||||
products_by_subtype.remove(&ptype.to_string()).expect(&format!("Expected product type '{}' but not found in the list of products", &ptype.to_string()))
|
|
||||||
} else {
|
|
||||||
products_by_subtype.into_iter().last().expect(&format!("Expected at least one build product in job {}, found zero", job_name)).1
|
|
||||||
};
|
|
||||||
|
|
||||||
// 3. FIXME: validate sha256hash during read?
|
|
||||||
let (store_path, rel_path) = StorePathRef::from_absolute_path_full(&product.store_path).expect("Failed to parse the product's store path");
|
|
||||||
crate::nar::copy_file_in_nar(&self.config.binary_cache_uri, &nix_compat::nixbase32::encode(store_path.digest()), rel_path.to_str().unwrap(), pbar, out).await.context("while copying the NAR to the target")
|
|
||||||
}
|
|
||||||
}
|
|
278
src/main.rs
278
src/main.rs
|
@ -1,278 +0,0 @@
|
||||||
mod config;
|
|
||||||
mod actions;
|
|
||||||
mod hydra;
|
|
||||||
mod git;
|
|
||||||
mod nar;
|
|
||||||
mod html;
|
|
||||||
|
|
||||||
use std::path::PathBuf;
|
|
||||||
|
|
||||||
use actions::{interpret_plan, Plan, PlanInterpretation, PlanOptions};
|
|
||||||
use clap::{Subcommand, Parser, Args};
|
|
||||||
use hydra::{Channel, Evaluation, HydraClient, Release};
|
|
||||||
use tokio_stream::StreamExt;
|
|
||||||
use tracing::{info, trace, warn};
|
|
||||||
use object_store::{aws::AmazonS3, ObjectStore};
|
|
||||||
use tempdir::TempDir;
|
|
||||||
|
|
||||||
use tracing_opentelemetry::OpenTelemetryLayer;
|
|
||||||
use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt};
|
|
||||||
use opentelemetry::trace::TracerProvider;
|
|
||||||
|
|
||||||
#[derive(Debug, Args)]
|
|
||||||
struct ChannelArgs {
|
|
||||||
/// Channel name to update
|
|
||||||
channel_name: String,
|
|
||||||
/// Job name to fetch from the Hydra instance configured
|
|
||||||
job_name: String,
|
|
||||||
}
|
|
||||||
|
|
||||||
#[derive(Debug, Args)]
|
|
||||||
struct RemoteArgs {
|
|
||||||
/// Channel name to update
|
|
||||||
channel_name: String,
|
|
||||||
/// Staging prefix
|
|
||||||
staging_prefix: String
|
|
||||||
}
|
|
||||||
|
|
||||||
#[derive(Debug, Args)]
|
|
||||||
struct GlobalOpts {
|
|
||||||
/// TOML configuration file for channel updates
|
|
||||||
#[arg(short, long)]
|
|
||||||
config_file: PathBuf,
|
|
||||||
/// Whether to execute no remote side effects (S3 uploads, redirections), etc.
|
|
||||||
#[arg(short, long, default_value_t = false)]
|
|
||||||
dry_run: bool,
|
|
||||||
/// Whether to bypass all preflight checks
|
|
||||||
#[arg(short, long, default_value_t = false)]
|
|
||||||
bypass_preflight_checks: bool,
|
|
||||||
/// Enable or disable progress bar reports
|
|
||||||
#[arg(short, long, default_value_t = true)]
|
|
||||||
enable_progress_bar: bool,
|
|
||||||
/// Stream the plan instead of preparing it in a local directory
|
|
||||||
#[arg(short, long, default_value_t = true)]
|
|
||||||
stream: bool,
|
|
||||||
#[arg(long, default_values = &["sha256", "blake3"])]
|
|
||||||
checksum: Vec<actions::Checksum>
|
|
||||||
}
|
|
||||||
|
|
||||||
#[derive(Debug, Parser)]
|
|
||||||
#[command(version, about, long_about = None)]
|
|
||||||
struct App {
|
|
||||||
#[command(flatten)]
|
|
||||||
global_opts: GlobalOpts,
|
|
||||||
#[command(subcommand)]
|
|
||||||
command: Commands
|
|
||||||
}
|
|
||||||
|
|
||||||
#[derive(Debug, Subcommand)]
|
|
||||||
enum Commands {
|
|
||||||
/// Print the plan for the given channel name and job name
|
|
||||||
Plan(ChannelArgs),
|
|
||||||
/// Apply the plan that would be generated for the given channel name and job name
|
|
||||||
Apply(ChannelArgs),
|
|
||||||
/// Apply an existing remote plan that was planned in the past
|
|
||||||
ApplyRemote(RemoteArgs),
|
|
||||||
}
|
|
||||||
|
|
||||||
#[derive(Debug)]
|
|
||||||
struct PreflightCheckContext<'a> {
|
|
||||||
target_channel: &'a Channel,
|
|
||||||
new_release: &'a Release,
|
|
||||||
new_evaluation: &'a Evaluation,
|
|
||||||
current_release_path: Option<object_store::path::Path>,
|
|
||||||
}
|
|
||||||
|
|
||||||
#[tracing::instrument]
|
|
||||||
async fn run_preflight_checks(channel: &Channel, release: &Release, evaluation: &Evaluation, channel_bucket: AmazonS3) -> bool {
|
|
||||||
info!("Running pre-flight checks...");
|
|
||||||
let channel_name = object_store::path::Path::parse(&channel.name).expect("Channel name should be a valid S3 path");
|
|
||||||
let mut context = PreflightCheckContext {
|
|
||||||
target_channel: channel,
|
|
||||||
new_release: release,
|
|
||||||
new_evaluation: evaluation,
|
|
||||||
current_release_path: None
|
|
||||||
};
|
|
||||||
match channel_bucket.get_opts(&channel_name, object_store::GetOptions { head: true, ..Default::default() }).await {
|
|
||||||
Ok(object_store::GetResult { attributes, meta, .. }) => {
|
|
||||||
info!("Release found: {:?}", meta);
|
|
||||||
trace!("Attributes: {:#?}", attributes);
|
|
||||||
if let Some(redirection_target) = attributes.get(&object_store::Attribute::Metadata("x-amz-website-redirect-location".into())) {
|
|
||||||
context.current_release_path = Some(redirection_target.to_string().into());
|
|
||||||
}
|
|
||||||
},
|
|
||||||
Err(err) => {
|
|
||||||
warn!("Error while asking the channel bucket: {}", err);
|
|
||||||
todo!();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
trace!("Preflight context check assembled: {:?}", context);
|
|
||||||
// TODO: run anti rollback protection
|
|
||||||
true
|
|
||||||
}
|
|
||||||
|
|
||||||
enum PlanResult {
|
|
||||||
AlreadyExecuted {
|
|
||||||
remote_path: object_store::path::Path
|
|
||||||
},
|
|
||||||
LocallyPrepared {
|
|
||||||
local_path: TempDir,
|
|
||||||
target_path: object_store::path::Path
|
|
||||||
},
|
|
||||||
StreamedRemotely {
|
|
||||||
remote_path: object_store::path::Path,
|
|
||||||
target_path: object_store::path::Path
|
|
||||||
},
|
|
||||||
}
|
|
||||||
|
|
||||||
impl From<PlanInterpretation> for PlanResult {
|
|
||||||
fn from(value: PlanInterpretation) -> Self {
|
|
||||||
match value {
|
|
||||||
PlanInterpretation::Streamed { remote_path, target_path } => PlanResult::StreamedRemotely {
|
|
||||||
remote_path,
|
|
||||||
target_path
|
|
||||||
},
|
|
||||||
PlanInterpretation::LocallyPlanned { local_path, target_path } => PlanResult::LocallyPrepared {
|
|
||||||
local_path,
|
|
||||||
target_path,
|
|
||||||
},
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
#[tracing::instrument]
|
|
||||||
async fn move_directory<OS: object_store::ObjectStore>(client: OS, from: &object_store::path::Path, to: &object_store::path::Path) -> std::io::Result<()> {
|
|
||||||
let prefix = from.to_string();
|
|
||||||
while let Some(object) = client.list(Some(from)).try_next().await? {
|
|
||||||
let source = object.location.to_string();
|
|
||||||
let stripped_object_path = source.strip_prefix(&prefix).expect(
|
|
||||||
"Failed to strip the `from` prefix during move directory"
|
|
||||||
);
|
|
||||||
let target = format!("{}/{}", to.to_string(), stripped_object_path);
|
|
||||||
client.rename(&object.location, &target.into()).await?;
|
|
||||||
}
|
|
||||||
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
|
|
||||||
#[tracing::instrument]
|
|
||||||
async fn plan(hydra_client: &HydraClient<'_>, channel_name: &str, job_name: String, global_opts: &GlobalOpts, config: &config::MirrorConfig) -> std::io::Result<PlanResult> {
|
|
||||||
let chan = Channel {
|
|
||||||
name: channel_name.to_string()
|
|
||||||
};
|
|
||||||
info!("Planning for channel {} using job {}", &chan.name, &job_name);
|
|
||||||
let release = hydra_client.fetch_release(&job_name)
|
|
||||||
.await.expect("Failed to fetch release");
|
|
||||||
trace!("{:#?}", release);
|
|
||||||
let evaluation = hydra_client.fetch_evaluation(&release)
|
|
||||||
.await.expect("Failed to fetch evaluation");
|
|
||||||
trace!("{:?}", evaluation.jobset_eval_inputs);
|
|
||||||
|
|
||||||
let plan_options = PlanOptions {
|
|
||||||
streaming: global_opts.stream,
|
|
||||||
enable_progress_bar: global_opts.enable_progress_bar,
|
|
||||||
requested_checksums: global_opts.checksum.clone()
|
|
||||||
};
|
|
||||||
|
|
||||||
if let hydra::Input::Git { revision, .. } = evaluation.jobset_eval_inputs.get("nixpkgs").expect("Expected a nixpkgs repository") {
|
|
||||||
info!("Release information:\n- Release is: {} (build {})\n- Eval is: {}\n- Prefix is: {}\n- Git commit is {}\n",
|
|
||||||
release.nix_name, release.id, evaluation.id, release.prefix(&chan), revision);
|
|
||||||
|
|
||||||
let release_bucket = config.release_bucket();
|
|
||||||
|
|
||||||
// If the release already exists, skip it.
|
|
||||||
if release_bucket.head(&release.prefix(&chan)).await.is_ok() {
|
|
||||||
tracing::warn!("Release already exists, skipping");
|
|
||||||
return Ok(PlanResult::AlreadyExecuted {
|
|
||||||
remote_path: release.prefix(&chan)
|
|
||||||
});
|
|
||||||
}
|
|
||||||
|
|
||||||
let channel_bucket = config.channel_bucket();
|
|
||||||
if global_opts.bypass_preflight_checks || run_preflight_checks(&chan, &release, &evaluation, channel_bucket).await {
|
|
||||||
let actions = actions::generate_plan(&config, &chan, &release, &evaluation).await;
|
|
||||||
tracing::debug!("Action plan: {:#?}", actions);
|
|
||||||
|
|
||||||
let interpretation = interpret_plan(&hydra_client, Plan {
|
|
||||||
actions,
|
|
||||||
release,
|
|
||||||
channel: chan.clone(),
|
|
||||||
target_bucket: config.release_bucket(),
|
|
||||||
revision: revision.to_string(),
|
|
||||||
}, plan_options).await;
|
|
||||||
info!("Plan interpreted: {:?}", interpretation);
|
|
||||||
|
|
||||||
Ok(interpretation.into())
|
|
||||||
} else {
|
|
||||||
tracing::error!("Preflight check failed, cannot continue, pass `--force` if you want to bypass preflight checks");
|
|
||||||
Err(std::io::Error::other("preflight check failure"))
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
panic!("Nixpkgs input is not of type Git");
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
#[tokio::main]
|
|
||||||
async fn main() -> std::io::Result<()> {
|
|
||||||
let args = App::parse();
|
|
||||||
|
|
||||||
let config: config::MirrorConfig = toml::from_str(&std::fs::read_to_string(&args.global_opts.config_file)
|
|
||||||
.expect("Failed to read the configuration file"))
|
|
||||||
.expect("Failed to deserialize the configuration file");
|
|
||||||
|
|
||||||
let hydra_client: HydraClient = HydraClient {
|
|
||||||
config: &config
|
|
||||||
};
|
|
||||||
|
|
||||||
let trace_provider = opentelemetry_otlp::new_pipeline()
|
|
||||||
.tracing()
|
|
||||||
.with_trace_config(opentelemetry_sdk::trace::Config::default().with_resource(opentelemetry_sdk::Resource::new(vec![
|
|
||||||
opentelemetry::KeyValue::new("service.name", env!("CARGO_PKG_NAME")),
|
|
||||||
opentelemetry::KeyValue::new("service.version", env!("CARGO_PKG_VERSION")),
|
|
||||||
])))
|
|
||||||
.with_exporter(opentelemetry_otlp::new_exporter().http())
|
|
||||||
.install_batch(opentelemetry_sdk::runtime::Tokio)
|
|
||||||
.expect("Couldn't create OTLP tracer");
|
|
||||||
|
|
||||||
opentelemetry::global::set_tracer_provider(trace_provider.clone());
|
|
||||||
let fmt_layer = tracing_subscriber::fmt::layer().with_writer(std::io::stderr);
|
|
||||||
|
|
||||||
tracing_subscriber::registry()
|
|
||||||
.with(tracing_subscriber::EnvFilter::from_default_env())
|
|
||||||
.with(fmt_layer)
|
|
||||||
.with(OpenTelemetryLayer::new(trace_provider.tracer("channel-scripts")))
|
|
||||||
.init();
|
|
||||||
|
|
||||||
match args.command {
|
|
||||||
Commands::Plan(channel) => {
|
|
||||||
plan(&hydra_client, &channel.channel_name, channel.job_name, &args.global_opts, &config).await?;
|
|
||||||
},
|
|
||||||
Commands::Apply(channel) => {
|
|
||||||
let plan = plan(&hydra_client, &channel.channel_name, channel.job_name, &args.global_opts, &config).await?;
|
|
||||||
|
|
||||||
match plan {
|
|
||||||
PlanResult::AlreadyExecuted { .. } => {
|
|
||||||
info!("Plan was already executed, no further action required");
|
|
||||||
},
|
|
||||||
PlanResult::LocallyPrepared { .. } => todo!("Uploading a plan is a todo; use streaming!"),
|
|
||||||
PlanResult::StreamedRemotely { remote_path, target_path } => {
|
|
||||||
info!("Plan was streamed remotely successfully to '{}@{}', replacing {}", config.s3_release_bucket_name, remote_path, target_path);
|
|
||||||
|
|
||||||
move_directory(config.release_bucket(), &remote_path, &target_path)
|
|
||||||
.await.expect("Failed to atomically promote the streamed channel, aborting");
|
|
||||||
}
|
|
||||||
}
|
|
||||||
},
|
|
||||||
Commands::ApplyRemote(remote) => {
|
|
||||||
let release = config.release_bucket();
|
|
||||||
// TODO: fetch the release and obtain the right name.
|
|
||||||
|
|
||||||
move_directory(config.release_bucket(), &remote.staging_prefix.try_into().unwrap(), &remote.channel_name.try_into().unwrap())
|
|
||||||
.await.expect("Failed to atomically rename the staging prefix into the channel name");
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
Ok(())
|
|
||||||
}
|
|
117
src/nar.rs
117
src/nar.rs
|
@ -1,117 +0,0 @@
|
||||||
use std::borrow::{Borrow, BorrowMut};
|
|
||||||
|
|
||||||
use anyhow::{bail, Context};
|
|
||||||
use async_compression::tokio::bufread::ZstdDecoder;
|
|
||||||
use bytes::Bytes;
|
|
||||||
use futures::{Stream, TryStreamExt};
|
|
||||||
use indicatif::ProgressBar;
|
|
||||||
use tracing::debug;
|
|
||||||
use nix_compat::nar::listing::{Listing, ListingEntry};
|
|
||||||
use nix_compat::narinfo::NarInfo;
|
|
||||||
use tokio::io::{AsyncReadExt, AsyncWrite};
|
|
||||||
use tokio_util::io::{ReaderStream, StreamReader};
|
|
||||||
|
|
||||||
/// Returns a NarInfo from a HTTP binary cache.
|
|
||||||
#[tracing::instrument]
|
|
||||||
pub async fn get_narinfo(binary_cache_uri: &str, narinfo_hash: &str) -> reqwest::Result<String> {
|
|
||||||
let client = reqwest::Client::new();
|
|
||||||
let resp = client.get(format!("{}/{}.narinfo", binary_cache_uri, narinfo_hash))
|
|
||||||
.header("User-Agent", "nixos-channel-scripts (rust)")
|
|
||||||
.send()
|
|
||||||
.await?;
|
|
||||||
|
|
||||||
resp.text().await
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Returns a listing from a HTTP binary cache.
|
|
||||||
#[tracing::instrument]
|
|
||||||
pub async fn get_listing(binary_cache_uri: &str, narinfo_hash: &str) -> reqwest::Result<Listing> {
|
|
||||||
let client = reqwest::Client::builder().brotli(true).build().unwrap();
|
|
||||||
let resp = client.get(format!("{}/{}.ls", binary_cache_uri, narinfo_hash))
|
|
||||||
.header("User-Agent", "nixos-channel-scripts (rust)")
|
|
||||||
.send()
|
|
||||||
.await?;
|
|
||||||
|
|
||||||
resp.json().await
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Returns a `Content-Range`-aware HTTP body reader of the NAR.
|
|
||||||
pub async fn ranged_nar_reader(binary_cache_uri: &str, narinfo: NarInfo<'_>, start: u64, end: u64) -> reqwest::Result<impl Stream<Item=reqwest::Result<bytes::Bytes>>> {
|
|
||||||
let client = reqwest::Client::new();
|
|
||||||
// TODO: handle compression?
|
|
||||||
let resp = client.get(format!("{}/{}", binary_cache_uri, narinfo.url))
|
|
||||||
.header("User-Agent", "nixos-channel-scripts (rust)")
|
|
||||||
.header("Content-Range", format!("bytes {}-{}/*", start, end))
|
|
||||||
.send()
|
|
||||||
.await?;
|
|
||||||
|
|
||||||
Ok(resp.bytes_stream())
|
|
||||||
}
|
|
||||||
|
|
||||||
#[tracing::instrument]
|
|
||||||
pub async fn nar_reader(binary_cache_uri: &str, narinfo: NarInfo<'_>) -> reqwest::Result<impl Stream<Item=reqwest::Result<bytes::Bytes>>> {
|
|
||||||
let client = reqwest::Client::new();
|
|
||||||
// TODO: handle compression?
|
|
||||||
let resp = client.get(format!("{}/{}", binary_cache_uri, narinfo.url))
|
|
||||||
.header("User-Agent", "nixos-channel-scripts (rust)")
|
|
||||||
.send()
|
|
||||||
.await?;
|
|
||||||
|
|
||||||
Ok(resp.bytes_stream())
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Streams a binary cache file contained in a NAR
|
|
||||||
/// by using narinfo + listing information.
|
|
||||||
#[tracing::instrument]
|
|
||||||
pub async fn file_in_nar_bytes_stream(binary_cache_uri: &str, narinfo_hash: &str, path: &str) -> anyhow::Result<(u64, impl Stream<Item = std::io::Result<Bytes>>)> {
|
|
||||||
let narinfo_str: String = get_narinfo(binary_cache_uri, narinfo_hash).await?;
|
|
||||||
let narinfo: NarInfo = NarInfo::parse(narinfo_str.as_ref()).context("while parsing the narinfo").expect("Failed to parse narinfo from HTTP binary cache server");
|
|
||||||
let listing: Listing = get_listing(binary_cache_uri, narinfo_hash).await.context("while parsing the listing of that NAR")?;
|
|
||||||
match listing {
|
|
||||||
Listing::V1 { root, .. } => {
|
|
||||||
let entry = root.locate(path)
|
|
||||||
.expect("Invalid relative path to the NAR")
|
|
||||||
.ok_or(tokio::io::Error::new(tokio::io::ErrorKind::NotFound, format!("Entry {} not found in the NAR listing", path)))?;
|
|
||||||
|
|
||||||
if let ListingEntry::Regular {
|
|
||||||
size, nar_offset, ..
|
|
||||||
} = entry {
|
|
||||||
// TODO: this is ugly.
|
|
||||||
let mut file_reader = ZstdDecoder::new(
|
|
||||||
StreamReader::new(nar_reader(binary_cache_uri, narinfo).await?.map_err(std::io::Error::other))
|
|
||||||
);
|
|
||||||
|
|
||||||
// Discard *nar_offset bytes
|
|
||||||
let discarded = tokio::io::copy(&mut file_reader.borrow_mut().take(*nar_offset), &mut tokio::io::sink()).await
|
|
||||||
.context("while discarding the start of the NAR").unwrap();
|
|
||||||
|
|
||||||
debug!("discarded {} bytes and taking at most {} bytes", discarded, *size);
|
|
||||||
|
|
||||||
// Let the consumer copy at most *size bytes of data.
|
|
||||||
Ok((*size, ReaderStream::new(file_reader.take(*size))))
|
|
||||||
} else {
|
|
||||||
bail!("Expected a file, obtained either a symlink or a directory");
|
|
||||||
}
|
|
||||||
|
|
||||||
},
|
|
||||||
_ => bail!("Unsupported listing version")
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Hits the binary cache with a narinfo request
|
|
||||||
/// and copies a specific file to the provided writer
|
|
||||||
/// asynchronously.
|
|
||||||
pub async fn copy_file_in_nar<W: AsyncWrite + Unpin>(binary_cache_uri: &str, narinfo_hash: &str, path: &str, pbar: ProgressBar, out: &mut W) -> anyhow::Result<u64> {
|
|
||||||
let (size, stream) = file_in_nar_bytes_stream(binary_cache_uri, narinfo_hash, path).await?;
|
|
||||||
pbar.set_length(size);
|
|
||||||
|
|
||||||
let copied = tokio::io::copy(
|
|
||||||
&mut StreamReader::new(stream),
|
|
||||||
&mut pbar.wrap_async_write(out)
|
|
||||||
).await.context("while copying the file in the NAR").unwrap();
|
|
||||||
|
|
||||||
assert!(copied == size, "mismatch in the copy sizes");
|
|
||||||
pbar.finish();
|
|
||||||
|
|
||||||
return Ok(copied)
|
|
||||||
}
|
|
|
@ -1,72 +0,0 @@
|
||||||
<!DOCTYPE html>
|
|
||||||
<html lang="en">
|
|
||||||
<head>
|
|
||||||
<meta charset="UTF-8">
|
|
||||||
<meta name="viewport" content="width=device-width, initial-scale=1.0">
|
|
||||||
<title>{{channelName}} - NixOS Channel</title>
|
|
||||||
<style>
|
|
||||||
body {
|
|
||||||
font-family: Arial, sans-serif;
|
|
||||||
margin: 20px;
|
|
||||||
}
|
|
||||||
.header {
|
|
||||||
margin-bottom: 20px;
|
|
||||||
}
|
|
||||||
.header h1 {
|
|
||||||
margin: 0;
|
|
||||||
font-size: 1.5em;
|
|
||||||
}
|
|
||||||
.header p {
|
|
||||||
margin: 5px 0;
|
|
||||||
color: #666;
|
|
||||||
}
|
|
||||||
.links a {
|
|
||||||
color: #0066cc;
|
|
||||||
text-decoration: none;
|
|
||||||
}
|
|
||||||
.file-list {
|
|
||||||
width: 100%;
|
|
||||||
border-collapse: collapse;
|
|
||||||
margin-top: 20px;
|
|
||||||
}
|
|
||||||
.file-list th, .file-list td {
|
|
||||||
border: 1px solid #ddd;
|
|
||||||
padding: 8px;
|
|
||||||
text-align: left;
|
|
||||||
}
|
|
||||||
.file-list th {
|
|
||||||
background-color: #f2f2f2;
|
|
||||||
}
|
|
||||||
</style>
|
|
||||||
</head>
|
|
||||||
<body>
|
|
||||||
<div class="header">
|
|
||||||
<h1>Channel: {{channelName}}</h1>
|
|
||||||
<p>Release: {{releaseName}}</p>
|
|
||||||
<p>Date of Release: {{releaseDate}}</p>
|
|
||||||
<div class="links">
|
|
||||||
<a href="{{gitCommitLink}}">Git Commit</a> |
|
|
||||||
<a href="{{hydraEvalLink}}">Hydra Evaluation</a>
|
|
||||||
</div>
|
|
||||||
</div>
|
|
||||||
|
|
||||||
<table class="file-list">
|
|
||||||
<thead>
|
|
||||||
<tr>
|
|
||||||
<th>File Name</th>
|
|
||||||
<th>Size</th>
|
|
||||||
<th>SHA-256 Hash</th>
|
|
||||||
</tr>
|
|
||||||
</thead>
|
|
||||||
<tbody>
|
|
||||||
{{#each files}}
|
|
||||||
<tr>
|
|
||||||
<td>{{name}}</td>
|
|
||||||
<td>{{size}}</td>
|
|
||||||
<td>{{sha256}}</td>
|
|
||||||
</tr>
|
|
||||||
{{/each}}
|
|
||||||
</tbody>
|
|
||||||
</table>
|
|
||||||
</body>
|
|
||||||
</html>
|
|
|
@ -1,78 +0,0 @@
|
||||||
<!DOCTYPE html>
|
|
||||||
<html lang="en">
|
|
||||||
<head>
|
|
||||||
<meta charset="UTF-8">
|
|
||||||
<meta name="viewport" content="width=device-width, initial-scale=1.0">
|
|
||||||
<title>{{channelName}} - NixOS Channel</title>
|
|
||||||
<style>
|
|
||||||
body {
|
|
||||||
font-family: Arial, sans-serif;
|
|
||||||
margin: 20px;
|
|
||||||
}
|
|
||||||
.header {
|
|
||||||
margin-bottom: 20px;
|
|
||||||
}
|
|
||||||
.header h1 {
|
|
||||||
margin: 0;
|
|
||||||
font-size: 1.5em;
|
|
||||||
}
|
|
||||||
.header p {
|
|
||||||
margin: 5px 0;
|
|
||||||
color: #666;
|
|
||||||
}
|
|
||||||
.links a {
|
|
||||||
color: #0066cc;
|
|
||||||
text-decoration: none;
|
|
||||||
}
|
|
||||||
.file-list {
|
|
||||||
width: 100%;
|
|
||||||
border-collapse: collapse;
|
|
||||||
margin-top: 20px;
|
|
||||||
}
|
|
||||||
.file-list th, .file-list td {
|
|
||||||
border: 1px solid #ddd;
|
|
||||||
padding: 8px;
|
|
||||||
text-align: left;
|
|
||||||
}
|
|
||||||
.file-list th {
|
|
||||||
background-color: #f2f2f2;
|
|
||||||
}
|
|
||||||
</style>
|
|
||||||
</head>
|
|
||||||
<body>
|
|
||||||
<div class="header">
|
|
||||||
<h1>Channel: {{channelName}}</h1>
|
|
||||||
<p>Release: {{releaseName}}</p>
|
|
||||||
<p>Date of Release: {{releaseDate}}</p>
|
|
||||||
<div class="links">
|
|
||||||
<a href="{{gitCommitLink}}">Git Commit</a> |
|
|
||||||
<a href="{{hydraEvalLink}}">Hydra Evaluation</a>
|
|
||||||
</div>
|
|
||||||
</div>
|
|
||||||
|
|
||||||
<table class="file-list">
|
|
||||||
<thead>
|
|
||||||
<tr>
|
|
||||||
<th>File Name</th>
|
|
||||||
<th>Size</th>
|
|
||||||
{{#if checksums}}
|
|
||||||
{{#each checksums as |checksum|}}
|
|
||||||
<th>{{checksum}} hash</th>
|
|
||||||
{{/each}}
|
|
||||||
{{/if}}
|
|
||||||
</tr>
|
|
||||||
</thead>
|
|
||||||
<tbody>
|
|
||||||
{{#each files}}
|
|
||||||
<tr>
|
|
||||||
<td><a href="./{{name}}">{{name}}</a></td>
|
|
||||||
<td>{{size}}</td>
|
|
||||||
{{#each checksums as |checksum|}}
|
|
||||||
<td>{{this}}</td>
|
|
||||||
{{/each}}
|
|
||||||
</tr>
|
|
||||||
{{/each}}
|
|
||||||
</tbody>
|
|
||||||
</table>
|
|
||||||
</body>
|
|
||||||
</html>
|
|
Loading…
Reference in a new issue