feat: init

This commit is contained in:
sinavir 2024-07-22 18:02:26 +02:00
commit 8d1282ed2e
10 changed files with 4702 additions and 0 deletions

8
.gitignore vendored Normal file
View file

@ -0,0 +1,8 @@
/target
*.swp
result
result-*
.pre-commit-config.yaml

4356
Cargo.lock generated Normal file

File diff suppressed because it is too large Load diff

24
Cargo.toml Normal file
View file

@ -0,0 +1,24 @@
[package]
name = "multitier-tvix-cache"
version = "0.1.0"
edition = "2021"
[dependencies]
serde = "1.0.204"
tokio = { version = "1.38.1", features = ["full"] }
tokio-stream = "0.1.15"
tonic = "0.12.1"
tower = "0.4.13"
tower-http = "0.5.2"
tracing = "0.1.40"
tvix-castore = { git = "https://git.dgnum.eu/mdebray/tvl-depot", rev = "d070430e8a4754091b7b030d6b994915eb571523" }
tvix-store = { git = "https://git.dgnum.eu/mdebray/tvl-depot", rev = "d070430e8a4754091b7b030d6b994915eb571523", features = ["xp-store-composition" ] }
tvix-tracing = { git = "https://git.dgnum.eu/mdebray/tvl-depot", rev = "d070430e8a4754091b7b030d6b994915eb571523" }
nar-bridge = { git = "https://git.dgnum.eu/mdebray/tvl-depot", rev = "d070430e8a4754091b7b030d6b994915eb571523" }
url = "2.5.2"
clap = "4.5.9"
axum = "0.7.5"
tokio-listener = { version = "0.4.3", features = ["serde"] }
toml = "0.8.15"
futures = "0.3.30"
tonic-health = "0.12.1"

42
default.nix Normal file
View file

@ -0,0 +1,42 @@
{
sources ? import ./npins,
pkgs ? import sources.nixpkgs { },
}:
let
check = (import sources.git-hooks).run {
src = ./.;
hooks = {
# Nix Hooks
statix.enable = true;
deadnix.enable = true;
rfc101 = {
enable = true;
name = "RFC-101 formatting";
entry = "${pkgs.lib.getExe pkgs.nixfmt-rfc-style}";
files = "\\.nix$";
};
# Misc Hooks
commitizen.enable = true;
};
};
in
{
inherit pkgs;
shell = pkgs.mkShell {
name = "multitenant-tvix-binary-cache";
buildInputs = check.enabledPackages ++ [
pkgs.cargo
pkgs.protobuf
pkgs.rustc
pkgs.rust-analyzer
(pkgs.rustfmt.override { asNightly = true; })
];
shellHook = ''
${check.shellHook}
'';
};
}

79
npins/default.nix Normal file
View file

@ -0,0 +1,79 @@
# Generated by npins. Do not modify; will be overwritten regularly
let
data = builtins.fromJSON (builtins.readFile ./sources.json);
inherit (data) version;
mkSource =
spec:
assert spec ? type;
let
path =
if spec.type == "Git" then
mkGitSource spec
else if spec.type == "GitRelease" then
mkGitSource spec
else if spec.type == "PyPi" then
mkPyPiSource spec
else if spec.type == "Channel" then
mkChannelSource spec
else
builtins.throw "Unknown source type ${spec.type}";
in
spec // { outPath = path; };
mkGitSource =
{
repository,
revision,
url ? null,
hash,
...
}:
assert repository ? type;
# At the moment, either it is a plain git repository (which has an url), or it is a GitHub/GitLab repository
# In the latter case, there we will always be an url to the tarball
if url != null then
(builtins.fetchTarball {
inherit url;
sha256 = hash; # FIXME: check nix version & use SRI hashes
})
else
assert repository.type == "Git";
let
urlToName =
url: rev:
let
matched = builtins.match "^.*/([^/]*)(\\.git)?$" repository.url;
short = builtins.substring 0 7 rev;
appendShort = if (builtins.match "[a-f0-9]*" rev) != null then "-${short}" else "";
in
"${if matched == null then "source" else builtins.head matched}${appendShort}";
name = urlToName repository.url revision;
in
builtins.fetchGit {
inherit (repository) url;
rev = revision;
inherit name;
# hash = hash;
};
mkPyPiSource =
{ url, hash, ... }:
builtins.fetchurl {
inherit url;
sha256 = hash;
};
mkChannelSource =
{ url, hash, ... }:
builtins.fetchTarball {
inherit url;
sha256 = hash;
};
in
if version == 3 then
builtins.mapAttrs (_: mkSource) data.pins
else
throw "Unsupported format version ${toString version} in sources.json. Try running `npins upgrade`"

23
npins/sources.json Normal file
View file

@ -0,0 +1,23 @@
{
"pins": {
"git-hooks": {
"type": "Git",
"repository": {
"type": "GitHub",
"owner": "cachix",
"repo": "git-hooks.nix"
},
"branch": "master",
"revision": "0ff4381bbb8f7a52ca4a851660fc7a437a4c6e07",
"url": "https://github.com/cachix/git-hooks.nix/archive/0ff4381bbb8f7a52ca4a851660fc7a437a4c6e07.tar.gz",
"hash": "0bmgc731c5rvky6qxc4f6gvgyiic8dna5dv3j19kya86idf7wn0p"
},
"nixpkgs": {
"type": "Channel",
"name": "nixpkgs-unstable",
"url": "https://releases.nixos.org/nixpkgs/nixpkgs-24.11pre644361.1e3deb3d8a86/nixexprs.tar.xz",
"hash": "0q8wrydwkyyjag9dz6mazmqnzw14jgg0vzj4n5zz94zq9fgnl8kc"
}
},
"version": 3
}

3
rustfmt.toml Normal file
View file

@ -0,0 +1,3 @@
unstable_features = true
group_imports = "StdExternalCrate"
imports_granularity = "Module"

1
shell.nix Normal file
View file

@ -0,0 +1 @@
(import ./default.nix { }).shell

20
src/config.rs Normal file
View file

@ -0,0 +1,20 @@
use std::collections::HashMap;
use serde::{Deserialize, Serialize};
#[derive(Debug, Deserialize, Serialize)]
pub struct EndpointsConfig(pub HashMap<tokio_listener::ListenerAddress, SingleEndpointConfig>);
#[derive(Debug, Deserialize, Serialize)]
pub struct SingleEndpointConfig {
pub endpoint_type: EndpointType,
pub blob_service: String,
pub directory_service: String,
pub path_info_service: String,
}
#[derive(PartialEq, Eq, Debug, Deserialize, Serialize)]
pub enum EndpointType {
Http,
Grpc,
}

146
src/main.rs Normal file
View file

@ -0,0 +1,146 @@
mod config;
use tracing::Level;
use tower_http::trace::{DefaultMakeSpan, TraceLayer};
use std::future::Future;
use tower::ServiceBuilder;
use tvix_castore::proto::blob_service_server::BlobServiceServer;
use tvix_castore::proto::directory_service_server::DirectoryServiceServer;
use tvix_castore::proto::GRPCBlobServiceWrapper;
use tvix_castore::proto::GRPCDirectoryServiceWrapper;
use tvix_store::pathinfoservice::PathInfoService;
use tvix_store::proto::path_info_service_server::PathInfoServiceServer;
use tvix_store::proto::GRPCPathInfoServiceWrapper;
use tvix_store::nar::{NarCalculationService, SimpleRenderer};
use std::pin::Pin;
use std::sync::Arc;
use tonic::transport::Server;
use tvix_castore::blobservice::BlobService;
use tvix_castore::directoryservice::DirectoryService;
use clap::Parser;
use config::{EndpointType, EndpointsConfig};
use futures::future::join_all;
use tracing::info;
use tvix_store::composition::{with_registry, Composition, REG};
use tvix_store::utils::CompositionConfigs;
use {nar_bridge, toml};
/// Expose the Nix HTTP Binary Cache protocol for a tvix-store.
#[derive(Parser)]
#[command(author, version, about, long_about = None)]
struct Cli {
#[arg(long, env)]
endpoints_config: String,
#[arg(long, env)]
store_composition: String,
}
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
let cli = Cli::parse();
let _tracing_handle = {
#[allow(unused_mut)]
let mut builder = tvix_tracing::TracingBuilder::default();
builder.build()?
};
let composition_text = tokio::fs::read_to_string(cli.store_composition).await?;
let configs: CompositionConfigs = with_registry(&REG, || toml::from_str(&composition_text))?;
let endpoints_text = tokio::fs::read_to_string(cli.endpoints_config).await?;
let endpoints: EndpointsConfig = toml::from_str(&endpoints_text)?;
info!("Starting multitenant-tvix-cache...");
let mut comp = Composition::default();
comp.extend(configs.blobservices);
comp.extend(configs.directoryservices);
comp.extend(configs.pathinfoservices);
let mut services: Vec<
Pin<Box<dyn Future<Output = Result<(), Box<dyn std::error::Error + Send + Sync>>>>>,
> = Vec::with_capacity(endpoints.0.len());
for (k, v) in endpoints.0.iter() {
if v.endpoint_type == EndpointType::Http {
let router = nar_bridge::gen_router(39).with_state(nar_bridge::AppState::new(
comp.build(v.blob_service.as_ref()).await?,
comp.build(v.directory_service.as_ref()).await?,
comp.build(v.path_info_service.as_ref()).await?,
));
// TODO: Tracing ?
services.push(Box::pin(async {
let listener = build_listener(k.clone())
.await
.map_err(|e| Box::<dyn std::error::Error + Send + Sync>::from(e.to_string()))?;
tokio_listener::axum07::serve(listener, router.into_make_service())
.await
.map_err(|e| Box::<dyn std::error::Error + Sync + Send>::from(e.to_string()))
}));
} else {
let blob_service: Arc<dyn BlobService> = comp.build(v.blob_service.as_ref()).await?;
let directory_service: Arc<dyn DirectoryService> = comp.build(v.directory_service.as_ref()).await?;
let path_info_service : Arc<dyn PathInfoService>= comp.build(v.path_info_service.as_ref()).await?;
let nar_calculation_service: Box<dyn NarCalculationService> = path_info_service
.nar_calculation_service()
.unwrap_or_else(|| {
Box::new(SimpleRenderer::new(
blob_service.clone(),
directory_service.clone(),
))
});
let mut server = Server::builder().layer(
ServiceBuilder::new()
.layer(
TraceLayer::new_for_grpc().make_span_with(
DefaultMakeSpan::new()
.level(Level::INFO)
.include_headers(true),
),
)
.map_request(tvix_tracing::propagate::tonic::accept_trace),
);
let (_health_reporter, health_service) = tonic_health::server::health_reporter();
let router = server
.add_service(health_service)
.add_service(BlobServiceServer::new(GRPCBlobServiceWrapper::new(
blob_service,
)))
.add_service(DirectoryServiceServer::new(
GRPCDirectoryServiceWrapper::new(directory_service),
))
.add_service(PathInfoServiceServer::new(GRPCPathInfoServiceWrapper::new(
path_info_service,
nar_calculation_service,
)));
let listener = build_listener(k.clone())
.await
.map_err(|e| Box::<dyn std::error::Error + Send + Sync>::from(e.to_string()))?;
router.serve_with_incoming(listener).await?;
}
}
join_all(services).await;
Ok(())
}
async fn build_listener(
listen_address: tokio_listener::ListenerAddress,
) -> Result<tokio_listener::Listener, std::io::Error> {
tokio_listener::Listener::bind(&listen_address, &Default::default(), &Default::default()).await
}