diff --git a/pkgs/tvix-src.nix b/pkgs/tvix-src.nix index 06033a2..75d2eda 100644 --- a/pkgs/tvix-src.nix +++ b/pkgs/tvix-src.nix @@ -2,6 +2,6 @@ fetchgit { name = "tvix"; url = "https://cl.tvl.fyi/depot"; - rev = "refs/changes/25/11925/4"; - hash = "sha256-iTkHg7YJgusq7c27rhILMsxPKTLOcL3H/0y6xFkttok="; + rev = "fdc0cf0c94827fc4940c9f8ac4d310d57aec9f35"; + hash = "sha256-KGRtHG0zBk+UJQFTztr/ZatS86VccjFlOK0BCcXWWeY="; } diff --git a/pkgs/tvix/01-remote-path-info-nar-bridge.patch b/pkgs/tvix/01-remote-path-info-nar-bridge.patch new file mode 100644 index 0000000..d49b104 --- /dev/null +++ b/pkgs/tvix/01-remote-path-info-nar-bridge.patch @@ -0,0 +1,1263 @@ +diff --git a/tvix/castore/src/composition.rs b/tvix/castore/src/composition.rs +index daa8a5008..18a767284 100644 +--- a/tvix/castore/src/composition.rs ++++ b/tvix/castore/src/composition.rs +@@ -454,11 +454,4 @@ impl Composition { + *entry = Box::new(new_val); + ret + } +- +- pub fn context(&self) -> CompositionContext { +- CompositionContext { +- stack: vec![], +- composition: Some(self), +- } +- } + } +diff --git a/tvix/cli/src/lib.rs b/tvix/cli/src/lib.rs +index 070e88e35..800ffb4e0 100644 +--- a/tvix/cli/src/lib.rs ++++ b/tvix/cli/src/lib.rs +@@ -60,7 +60,7 @@ pub fn init_io_handle(tokio_runtime: &tokio::runtime::Runtime, args: &Args) -> R + Rc::new(TvixStoreIO::new( + blob_service.clone(), + directory_service.clone(), +- path_info_service, ++ path_info_service.into(), + nar_calculation_service.into(), + build_service.into(), + tokio_runtime.handle().clone(), +diff --git a/tvix/glue/benches/eval.rs b/tvix/glue/benches/eval.rs +index ee3d554dc..55f37f2d5 100644 +--- a/tvix/glue/benches/eval.rs ++++ b/tvix/glue/benches/eval.rs +@@ -34,7 +34,7 @@ fn interpret(code: &str) { + let tvix_store_io = Rc::new(TvixStoreIO::new( + blob_service, + directory_service, +- path_info_service, ++ path_info_service.into(), + nar_calculation_service.into(), + Arc::::default(), + TOKIO_RUNTIME.handle().clone(), +diff --git a/tvix/glue/src/builtins/mod.rs b/tvix/glue/src/builtins/mod.rs +index e1d1c9c84..e1c20e461 100644 +--- a/tvix/glue/src/builtins/mod.rs ++++ b/tvix/glue/src/builtins/mod.rs +@@ -80,7 +80,7 @@ mod tests { + let io = Rc::new(TvixStoreIO::new( + blob_service, + directory_service, +- path_info_service, ++ path_info_service.into(), + nar_calculation_service.into(), + Arc::::default(), + runtime.handle().clone(), +diff --git a/tvix/glue/src/tests/mod.rs b/tvix/glue/src/tests/mod.rs +index e9f21c329..142ae1a07 100644 +--- a/tvix/glue/src/tests/mod.rs ++++ b/tvix/glue/src/tests/mod.rs +@@ -41,7 +41,7 @@ fn eval_test(code_path: PathBuf, expect_success: bool) { + let tvix_store_io = Rc::new(TvixStoreIO::new( + blob_service, + directory_service, +- path_info_service, ++ path_info_service.into(), + nar_calculation_service.into(), + Arc::new(DummyBuildService::default()), + tokio_runtime.handle().clone(), +diff --git a/tvix/glue/src/tvix_store_io.rs b/tvix/glue/src/tvix_store_io.rs +index cfc037e8d..e8c09af0e 100644 +--- a/tvix/glue/src/tvix_store_io.rs ++++ b/tvix/glue/src/tvix_store_io.rs +@@ -648,7 +648,7 @@ mod tests { + let io = Rc::new(TvixStoreIO::new( + blob_service, + directory_service, +- path_info_service, ++ path_info_service.into(), + nar_calculation_service.into(), + Arc::::default(), + tokio_runtime.handle().clone(), +diff --git a/tvix/nar-bridge/src/bin/nar-bridge.rs b/tvix/nar-bridge/src/bin/nar-bridge.rs +index 1e4223775..b24f155a7 100644 +--- a/tvix/nar-bridge/src/bin/nar-bridge.rs ++++ b/tvix/nar-bridge/src/bin/nar-bridge.rs +@@ -1,6 +1,7 @@ + use clap::Parser; + use nar_bridge::AppState; + use tracing::info; ++use tvix_store::pathinfoservice::{CachePathInfoService, PathInfoService}; + + /// Expose the Nix HTTP Binary Cache protocol for a tvix-store. + #[derive(Parser)] +@@ -15,6 +16,12 @@ struct Cli { + #[arg(long, env, default_value = "grpc+http://[::1]:8000")] + path_info_service_addr: String, + ++ /// URL to a PathInfoService that's considered "remote". ++ /// If set, the other one is considered "local", and a "cache" for the ++ /// "remote" one. ++ #[arg(long, env)] ++ remote_path_info_service_addr: Option, ++ + /// The priority to announce at the `nix-cache-info` endpoint. + /// A lower number means it's *more preferred. + #[arg(long, env, default_value_t = 39)] +@@ -55,7 +62,25 @@ async fn main() -> Result<(), Box> { + ) + .await?; + +- let state = AppState::new(blob_service, directory_service, path_info_service); ++ // if remote_path_info_service_addr has been specified, ++ // update path_info_service to point to a cache combining the two. ++ let path_info_service = if let Some(addr) = cli.remote_path_info_service_addr { ++ let remote_path_info_service = tvix_store::pathinfoservice::from_addr( ++ &addr, ++ blob_service.clone(), ++ directory_service.clone(), ++ ) ++ .await?; ++ ++ let path_info_service = ++ CachePathInfoService::new(path_info_service, remote_path_info_service); ++ ++ Box::new(path_info_service) as Box ++ } else { ++ path_info_service ++ }; ++ ++ let state = AppState::new(blob_service, directory_service, path_info_service.into()); + + let app = nar_bridge::gen_router(cli.priority).with_state(state); + +diff --git a/tvix/store/src/bin/tvix-store.rs b/tvix/store/src/bin/tvix-store.rs +index 99323d2a5..82c73f8f4 100644 +--- a/tvix/store/src/bin/tvix-store.rs ++++ b/tvix/store/src/bin/tvix-store.rs +@@ -16,6 +16,7 @@ use tracing::{info, info_span, instrument, Level, Span}; + use tracing_indicatif::span_ext::IndicatifSpanExt as _; + use tvix_castore::import::fs::ingest_path; + use tvix_store::nar::NarCalculationService; ++use tvix_store::pathinfoservice::CachePathInfoService; + use tvix_store::proto::NarInfo; + use tvix_store::proto::PathInfo; + +@@ -209,38 +210,31 @@ async fn run_cli(cli: Cli) -> Result<(), Box { + // initialize stores +- let mut configs = tvix_store::utils::addrs_to_configs( +- blob_service_addr, +- directory_service_addr, +- path_info_service_addr, +- )?; ++ let (blob_service, directory_service, path_info_service, nar_calculation_service) = ++ tvix_store::utils::construct_services( ++ blob_service_addr, ++ directory_service_addr, ++ path_info_service_addr, ++ ) ++ .await?; + + // if remote_path_info_service_addr has been specified, + // update path_info_service to point to a cache combining the two. +- if let Some(addr) = remote_path_info_service_addr { +- use tvix_store::composition::{with_registry, DeserializeWithRegistry, REG}; +- use tvix_store::pathinfoservice::CachePathInfoServiceConfig; +- +- let remote_url = url::Url::parse(&addr)?; +- let remote_config = with_registry(®, || remote_url.try_into())?; +- +- let local = configs.pathinfoservices.insert( +- "default".into(), +- DeserializeWithRegistry(Box::new(CachePathInfoServiceConfig { +- near: "local".into(), +- far: "remote".into(), +- })), +- ); +- configs +- .pathinfoservices +- .insert("local".into(), local.unwrap()); +- configs +- .pathinfoservices +- .insert("remote".into(), remote_config); +- } ++ let path_info_service = if let Some(addr) = remote_path_info_service_addr { ++ let remote_path_info_service = tvix_store::pathinfoservice::from_addr( ++ &addr, ++ blob_service.clone(), ++ directory_service.clone(), ++ ) ++ .await?; + +- let (blob_service, directory_service, path_info_service, nar_calculation_service) = +- tvix_store::utils::construct_services_from_configs(configs).await?; ++ let path_info_service = ++ CachePathInfoService::new(path_info_service, remote_path_info_service); ++ ++ Box::new(path_info_service) as Box ++ } else { ++ path_info_service ++ }; + + let mut server = Server::builder().layer( + ServiceBuilder::new() +@@ -263,7 +257,7 @@ async fn run_cli(cli: Cli) -> Result<(), Box Result<(), Box = path_info_service.into(); + let nar_calculation_service: Arc = + nar_calculation_service.into(); + +@@ -370,6 +365,9 @@ async fn run_cli(cli: Cli) -> Result<(), Box = + serde_json::from_slice(reference_graph_json.as_slice())?; + ++ // Arc the PathInfoService, as we clone it . ++ let path_info_service: Arc = path_info_service.into(); ++ + let lookups_span = info_span!( + "lookup pathinfos", + "indicatif.pb_show" = tracing::field::Empty +@@ -477,7 +475,7 @@ async fn run_cli(cli: Cli) -> Result<(), Box Result<(), Box, + } + ++/// Represents configuration of [BigtablePathInfoService]. ++/// This currently conflates both connect parameters and data model/client ++/// behaviour parameters. ++#[serde_as] ++#[derive(Clone, Debug, PartialEq, Deserialize, Serialize)] ++pub struct BigtableParameters { ++ project_id: String, ++ instance_name: String, ++ #[serde(default)] ++ is_read_only: bool, ++ #[serde(default = "default_channel_size")] ++ channel_size: usize, ++ ++ #[serde_as(as = "Option>")] ++ #[serde(default = "default_timeout")] ++ timeout: Option, ++ table_name: String, ++ family_name: String, ++ ++ #[serde(default = "default_app_profile_id")] ++ app_profile_id: String, ++} ++ ++impl BigtableParameters { ++ #[cfg(test)] ++ pub fn default_for_tests() -> Self { ++ Self { ++ project_id: "project-1".into(), ++ instance_name: "instance-1".into(), ++ is_read_only: false, ++ channel_size: default_channel_size(), ++ timeout: default_timeout(), ++ table_name: "table-1".into(), ++ family_name: "cf1".into(), ++ app_profile_id: default_app_profile_id(), ++ } ++ } ++} ++ ++fn default_app_profile_id() -> String { ++ "default".to_owned() ++} ++ ++fn default_channel_size() -> usize { ++ 4 ++} ++ ++fn default_timeout() -> Option { ++ Some(std::time::Duration::from_secs(4)) ++} ++ + impl BigtablePathInfoService { + #[cfg(not(test))] + pub async fn connect(params: BigtableParameters) -> Result { +@@ -363,88 +412,3 @@ impl PathInfoService for BigtablePathInfoService { + Box::pin(stream) + } + } +- +-/// Represents configuration of [BigtablePathInfoService]. +-/// This currently conflates both connect parameters and data model/client +-/// behaviour parameters. +-#[serde_as] +-#[derive(Clone, Debug, PartialEq, Deserialize, Serialize)] +-pub struct BigtableParameters { +- project_id: String, +- instance_name: String, +- #[serde(default)] +- is_read_only: bool, +- #[serde(default = "default_channel_size")] +- channel_size: usize, +- +- #[serde_as(as = "Option>")] +- #[serde(default = "default_timeout")] +- timeout: Option, +- table_name: String, +- family_name: String, +- +- #[serde(default = "default_app_profile_id")] +- app_profile_id: String, +-} +- +-impl BigtableParameters { +- #[cfg(test)] +- pub fn default_for_tests() -> Self { +- Self { +- project_id: "project-1".into(), +- instance_name: "instance-1".into(), +- is_read_only: false, +- channel_size: default_channel_size(), +- timeout: default_timeout(), +- table_name: "table-1".into(), +- family_name: "cf1".into(), +- app_profile_id: default_app_profile_id(), +- } +- } +-} +- +-fn default_app_profile_id() -> String { +- "default".to_owned() +-} +- +-fn default_channel_size() -> usize { +- 4 +-} +- +-fn default_timeout() -> Option { +- Some(std::time::Duration::from_secs(4)) +-} +- +-#[async_trait] +-impl ServiceBuilder for BigtableParameters { +- type Output = dyn PathInfoService; +- async fn build<'a>( +- &'a self, +- _instance_name: &str, +- _context: &CompositionContext, +- ) -> Result, Box> { +- Ok(Arc::new( +- BigtablePathInfoService::connect(self.clone()).await?, +- )) +- } +-} +- +-impl TryFrom for BigtableParameters { +- type Error = Box; +- fn try_from(mut url: url::Url) -> Result { +- // parse the instance name from the hostname. +- let instance_name = url +- .host_str() +- .ok_or_else(|| Error::StorageError("instance name missing".into()))? +- .to_string(); +- +- // … but add it to the query string now, so we just need to parse that. +- url.query_pairs_mut() +- .append_pair("instance_name", &instance_name); +- +- let params: BigtableParameters = serde_qs::from_str(url.query().unwrap_or_default()) +- .map_err(|e| Error::InvalidRequest(format!("failed to parse parameters: {}", e)))?; +- +- Ok(params) +- } +-} +diff --git a/tvix/store/src/pathinfoservice/combinators.rs b/tvix/store/src/pathinfoservice/combinators.rs +index bb5595f72..664144ef4 100644 +--- a/tvix/store/src/pathinfoservice/combinators.rs ++++ b/tvix/store/src/pathinfoservice/combinators.rs +@@ -1,11 +1,8 @@ +-use std::sync::Arc; +- + use crate::proto::PathInfo; + use futures::stream::BoxStream; + use nix_compat::nixbase32; + use tonic::async_trait; + use tracing::{debug, instrument}; +-use tvix_castore::composition::{CompositionContext, ServiceBuilder}; + use tvix_castore::Error; + + use super::PathInfoService; +@@ -64,41 +61,6 @@ where + } + } + +-#[derive(serde::Deserialize)] +-pub struct CacheConfig { +- pub near: String, +- pub far: String, +-} +- +-impl TryFrom for CacheConfig { +- type Error = Box; +- fn try_from(_url: url::Url) -> Result { +- Err(Error::StorageError( +- "Instantiating a CombinedPathInfoService from a url is not supported".into(), +- ) +- .into()) +- } +-} +- +-#[async_trait] +-impl ServiceBuilder for CacheConfig { +- type Output = dyn PathInfoService; +- async fn build<'a>( +- &'a self, +- _instance_name: &str, +- context: &CompositionContext, +- ) -> Result, Box> { +- let (near, far) = futures::join!( +- context.resolve(self.near.clone()), +- context.resolve(self.far.clone()) +- ); +- Ok(Arc::new(Cache { +- near: near?, +- far: far?, +- })) +- } +-} +- + #[cfg(test)] + mod test { + use std::num::NonZeroUsize; +diff --git a/tvix/store/src/pathinfoservice/from_addr.rs b/tvix/store/src/pathinfoservice/from_addr.rs +index 5635c226c..9173d25d0 100644 +--- a/tvix/store/src/pathinfoservice/from_addr.rs ++++ b/tvix/store/src/pathinfoservice/from_addr.rs +@@ -1,10 +1,13 @@ +-use super::PathInfoService; ++use crate::proto::path_info_service_client::PathInfoServiceClient; + +-use crate::composition::{ +- with_registry, CompositionContext, DeserializeWithRegistry, ServiceBuilder, REG, ++use super::{ ++ GRPCPathInfoService, MemoryPathInfoService, NixHTTPPathInfoService, PathInfoService, ++ SledPathInfoService, + }; ++ ++use nix_compat::narinfo; + use std::sync::Arc; +-use tvix_castore::Error; ++use tvix_castore::{blobservice::BlobService, directoryservice::DirectoryService, Error}; + use url::Url; + + /// Constructs a new instance of a [PathInfoService] from an URI. +@@ -31,21 +34,113 @@ use url::Url; + /// these also need to be passed in. + pub async fn from_addr( + uri: &str, +- context: Option<&CompositionContext<'_>>, +-) -> Result, Box> { ++ blob_service: Arc, ++ directory_service: Arc, ++) -> Result, Error> { + #[allow(unused_mut)] + let mut url = + Url::parse(uri).map_err(|e| Error::StorageError(format!("unable to parse url: {}", e)))?; + +- let path_info_service_config = with_registry(®, || { +- >>>::try_from( +- url, +- ) +- })? +- .0; +- let path_info_service = path_info_service_config +- .build("anonymous", context.unwrap_or(&CompositionContext::blank())) +- .await?; ++ let path_info_service: Box = match url.scheme() { ++ "memory" => { ++ // memory doesn't support host or path in the URL. ++ if url.has_host() || !url.path().is_empty() { ++ return Err(Error::StorageError("invalid url".to_string())); ++ } ++ Box::::default() ++ } ++ "sled" => { ++ // sled doesn't support host, and a path can be provided (otherwise ++ // it'll live in memory only). ++ if url.has_host() { ++ return Err(Error::StorageError("no host allowed".to_string())); ++ } ++ ++ if url.path() == "/" { ++ return Err(Error::StorageError( ++ "cowardly refusing to open / with sled".to_string(), ++ )); ++ } ++ ++ // TODO: expose other parameters as URL parameters? ++ ++ Box::new(if url.path().is_empty() { ++ SledPathInfoService::new_temporary() ++ .map_err(|e| Error::StorageError(e.to_string()))? ++ } else { ++ SledPathInfoService::new(url.path()) ++ .map_err(|e| Error::StorageError(e.to_string()))? ++ }) ++ } ++ "nix+http" | "nix+https" => { ++ // Stringify the URL and remove the nix+ prefix. ++ // We can't use `url.set_scheme(rest)`, as it disallows ++ // setting something http(s) that previously wasn't. ++ let new_url = Url::parse(url.to_string().strip_prefix("nix+").unwrap()).unwrap(); ++ ++ let mut nix_http_path_info_service = ++ NixHTTPPathInfoService::new(new_url, blob_service, directory_service); ++ ++ let pairs = &url.query_pairs(); ++ for (k, v) in pairs.into_iter() { ++ if k == "trusted-public-keys" { ++ let pubkey_strs: Vec<_> = v.split_ascii_whitespace().collect(); ++ ++ let mut pubkeys: Vec = Vec::with_capacity(pubkey_strs.len()); ++ for pubkey_str in pubkey_strs { ++ pubkeys.push(narinfo::PubKey::parse(pubkey_str).map_err(|e| { ++ Error::StorageError(format!("invalid public key: {e}")) ++ })?); ++ } ++ ++ nix_http_path_info_service.set_public_keys(pubkeys); ++ } ++ } ++ ++ Box::new(nix_http_path_info_service) ++ } ++ scheme if scheme.starts_with("grpc+") => { ++ // schemes starting with grpc+ go to the GRPCPathInfoService. ++ // That's normally grpc+unix for unix sockets, and grpc+http(s) for the HTTP counterparts. ++ // - In the case of unix sockets, there must be a path, but may not be a host. ++ // - In the case of non-unix sockets, there must be a host, but no path. ++ // Constructing the channel is handled by tvix_castore::channel::from_url. ++ Box::new(GRPCPathInfoService::from_client( ++ PathInfoServiceClient::with_interceptor( ++ tvix_castore::tonic::channel_from_url(&url).await?, ++ tvix_tracing::propagate::tonic::send_trace, ++ ), ++ )) ++ } ++ #[cfg(feature = "cloud")] ++ "bigtable" => { ++ use super::bigtable::BigtableParameters; ++ use super::BigtablePathInfoService; ++ ++ // parse the instance name from the hostname. ++ let instance_name = url ++ .host_str() ++ .ok_or_else(|| Error::StorageError("instance name missing".into()))? ++ .to_string(); ++ ++ // … but add it to the query string now, so we just need to parse that. ++ url.query_pairs_mut() ++ .append_pair("instance_name", &instance_name); ++ ++ let params: BigtableParameters = serde_qs::from_str(url.query().unwrap_or_default()) ++ .map_err(|e| Error::InvalidRequest(format!("failed to parse parameters: {}", e)))?; ++ ++ Box::new( ++ BigtablePathInfoService::connect(params) ++ .await ++ .map_err(|e| Error::StorageError(e.to_string()))?, ++ ) ++ } ++ _ => Err(Error::StorageError(format!( ++ "unknown scheme: {}", ++ url.scheme() ++ )))?, ++ }; + + Ok(path_info_service) + } +@@ -53,12 +148,14 @@ pub async fn from_addr( + #[cfg(test)] + mod tests { + use super::from_addr; +- use crate::composition::{Composition, DeserializeWithRegistry, ServiceBuilder}; + use lazy_static::lazy_static; + use rstest::rstest; ++ use std::sync::Arc; + use tempfile::TempDir; +- use tvix_castore::blobservice::{BlobService, MemoryBlobServiceConfig}; +- use tvix_castore::directoryservice::{DirectoryService, MemoryDirectoryServiceConfig}; ++ use tvix_castore::{ ++ blobservice::{BlobService, MemoryBlobService}, ++ directoryservice::{DirectoryService, MemoryDirectoryService}, ++ }; + + lazy_static! { + static ref TMPDIR_SLED_1: TempDir = TempDir::new().unwrap(); +@@ -127,19 +224,11 @@ mod tests { + )] + #[tokio::test] + async fn test_from_addr_tokio(#[case] uri_str: &str, #[case] exp_succeed: bool) { +- let mut comp = Composition::default(); +- comp.extend(vec![( +- "default".into(), +- DeserializeWithRegistry(Box::new(MemoryBlobServiceConfig {}) +- as Box>), +- )]); +- comp.extend(vec![( +- "default".into(), +- DeserializeWithRegistry(Box::new(MemoryDirectoryServiceConfig {}) +- as Box>), +- )]); +- +- let resp = from_addr(uri_str, Some(&comp.context())).await; ++ let blob_service: Arc = Arc::from(MemoryBlobService::default()); ++ let directory_service: Arc = ++ Arc::from(MemoryDirectoryService::default()); ++ ++ let resp = from_addr(uri_str, blob_service, directory_service).await; + + if exp_succeed { + resp.expect("should succeed"); +diff --git a/tvix/store/src/pathinfoservice/grpc.rs b/tvix/store/src/pathinfoservice/grpc.rs +index 2ac0e4330..bcee49aac 100644 +--- a/tvix/store/src/pathinfoservice/grpc.rs ++++ b/tvix/store/src/pathinfoservice/grpc.rs +@@ -6,11 +6,9 @@ use crate::{ + use async_stream::try_stream; + use futures::stream::BoxStream; + use nix_compat::nixbase32; +-use std::sync::Arc; + use tonic::{async_trait, Code}; + use tracing::{instrument, Span}; + use tracing_indicatif::span_ext::IndicatifSpanExt; +-use tvix_castore::composition::{CompositionContext, ServiceBuilder}; + use tvix_castore::{proto as castorepb, Error}; + + /// Connects to a (remote) tvix-store PathInfoService over gRPC. +@@ -151,40 +149,6 @@ where + } + } + +-#[derive(serde::Deserialize, Debug)] +-#[serde(deny_unknown_fields)] +-pub struct GRPCPathInfoServiceConfig { +- url: String, +-} +- +-impl TryFrom for GRPCPathInfoServiceConfig { +- type Error = Box; +- fn try_from(url: url::Url) -> Result { +- // normally grpc+unix for unix sockets, and grpc+http(s) for the HTTP counterparts. +- // - In the case of unix sockets, there must be a path, but may not be a host. +- // - In the case of non-unix sockets, there must be a host, but no path. +- // Constructing the channel is handled by tvix_castore::channel::from_url. +- Ok(GRPCPathInfoServiceConfig { +- url: url.to_string(), +- }) +- } +-} +- +-#[async_trait] +-impl ServiceBuilder for GRPCPathInfoServiceConfig { +- type Output = dyn PathInfoService; +- async fn build<'a>( +- &'a self, +- _instance_name: &str, +- _context: &CompositionContext, +- ) -> Result, Box> { +- let client = proto::path_info_service_client::PathInfoServiceClient::new( +- tvix_castore::tonic::channel_from_url(&self.url.parse()?).await?, +- ); +- Ok(Arc::new(GRPCPathInfoService::from_client(client))) +- } +-} +- + #[cfg(test)] + mod tests { + use crate::pathinfoservice::tests::make_grpc_path_info_service_client; +diff --git a/tvix/store/src/pathinfoservice/lru.rs b/tvix/store/src/pathinfoservice/lru.rs +index 39c592bc9..da674f497 100644 +--- a/tvix/store/src/pathinfoservice/lru.rs ++++ b/tvix/store/src/pathinfoservice/lru.rs +@@ -9,7 +9,6 @@ use tonic::async_trait; + use tracing::instrument; + + use crate::proto::PathInfo; +-use tvix_castore::composition::{CompositionContext, ServiceBuilder}; + use tvix_castore::Error; + + use super::PathInfoService; +@@ -61,34 +60,6 @@ impl PathInfoService for LruPathInfoService { + } + } + +-#[derive(serde::Deserialize, Debug)] +-#[serde(deny_unknown_fields)] +-pub struct LruPathInfoServiceConfig { +- capacity: NonZeroUsize, +-} +- +-impl TryFrom for LruPathInfoServiceConfig { +- type Error = Box; +- fn try_from(_url: url::Url) -> Result { +- Err(Error::StorageError( +- "Instantiating a LruPathInfoService from a url is not supported".into(), +- ) +- .into()) +- } +-} +- +-#[async_trait] +-impl ServiceBuilder for LruPathInfoServiceConfig { +- type Output = dyn PathInfoService; +- async fn build<'a>( +- &'a self, +- _instance_name: &str, +- _context: &CompositionContext, +- ) -> Result, Box> { +- Ok(Arc::new(LruPathInfoService::with_capacity(self.capacity))) +- } +-} +- + #[cfg(test)] + mod test { + use std::num::NonZeroUsize; +diff --git a/tvix/store/src/pathinfoservice/memory.rs b/tvix/store/src/pathinfoservice/memory.rs +index 3fabd239c..3de3221df 100644 +--- a/tvix/store/src/pathinfoservice/memory.rs ++++ b/tvix/store/src/pathinfoservice/memory.rs +@@ -7,7 +7,6 @@ use std::{collections::HashMap, sync::Arc}; + use tokio::sync::RwLock; + use tonic::async_trait; + use tracing::instrument; +-use tvix_castore::composition::{CompositionContext, ServiceBuilder}; + use tvix_castore::Error; + + #[derive(Default)] +@@ -60,30 +59,3 @@ impl PathInfoService for MemoryPathInfoService { + }) + } + } +- +-#[derive(serde::Deserialize, Debug)] +-#[serde(deny_unknown_fields)] +-pub struct MemoryPathInfoServiceConfig {} +- +-impl TryFrom for MemoryPathInfoServiceConfig { +- type Error = Box; +- fn try_from(url: url::Url) -> Result { +- // memory doesn't support host or path in the URL. +- if url.has_host() || !url.path().is_empty() { +- return Err(Error::StorageError("invalid url".to_string()).into()); +- } +- Ok(MemoryPathInfoServiceConfig {}) +- } +-} +- +-#[async_trait] +-impl ServiceBuilder for MemoryPathInfoServiceConfig { +- type Output = dyn PathInfoService; +- async fn build<'a>( +- &'a self, +- _instance_name: &str, +- _context: &CompositionContext, +- ) -> Result, Box> { +- Ok(Arc::new(MemoryPathInfoService::default())) +- } +-} +diff --git a/tvix/store/src/pathinfoservice/mod.rs b/tvix/store/src/pathinfoservice/mod.rs +index 70f752f22..574bcc0b8 100644 +--- a/tvix/store/src/pathinfoservice/mod.rs ++++ b/tvix/store/src/pathinfoservice/mod.rs +@@ -14,26 +14,22 @@ mod tests; + + use futures::stream::BoxStream; + use tonic::async_trait; +-use tvix_castore::composition::{Registry, ServiceBuilder}; + use tvix_castore::Error; + +-use crate::nar::NarCalculationService; + use crate::proto::PathInfo; + +-pub use self::combinators::{ +- Cache as CachePathInfoService, CacheConfig as CachePathInfoServiceConfig, +-}; ++pub use self::combinators::Cache as CachePathInfoService; + pub use self::from_addr::from_addr; +-pub use self::grpc::{GRPCPathInfoService, GRPCPathInfoServiceConfig}; +-pub use self::lru::{LruPathInfoService, LruPathInfoServiceConfig}; +-pub use self::memory::{MemoryPathInfoService, MemoryPathInfoServiceConfig}; +-pub use self::nix_http::{NixHTTPPathInfoService, NixHTTPPathInfoServiceConfig}; +-pub use self::sled::{SledPathInfoService, SledPathInfoServiceConfig}; ++pub use self::grpc::GRPCPathInfoService; ++pub use self::lru::LruPathInfoService; ++pub use self::memory::MemoryPathInfoService; ++pub use self::nix_http::NixHTTPPathInfoService; ++pub use self::sled::SledPathInfoService; + + #[cfg(feature = "cloud")] + mod bigtable; + #[cfg(feature = "cloud")] +-pub use self::bigtable::{BigtableParameters, BigtablePathInfoService}; ++pub use self::bigtable::BigtablePathInfoService; + + #[cfg(any(feature = "fuse", feature = "virtiofs"))] + pub use self::fs::make_fs; +@@ -56,16 +52,12 @@ pub trait PathInfoService: Send + Sync { + /// Rust doesn't support this as a generic in traits yet. This is the same thing that + /// [async_trait] generates, but for streams instead of futures. + fn list(&self) -> BoxStream<'static, Result>; +- +- fn nar_calculation_service(&self) -> Option> { +- None +- } + } + + #[async_trait] + impl PathInfoService for A + where +- A: AsRef + Send + Sync + 'static, ++ A: AsRef + Send + Sync, + { + async fn get(&self, digest: [u8; 20]) -> Result, Error> { + self.as_ref().get(digest).await +@@ -79,19 +71,3 @@ where + self.as_ref().list() + } + } +- +-/// Registers the builtin PathInfoService implementations with the registry +-pub(crate) fn register_pathinfo_services(reg: &mut Registry) { +- reg.register::>, CachePathInfoServiceConfig>("cache"); +- reg.register::>, GRPCPathInfoServiceConfig>("grpc"); +- reg.register::>, LruPathInfoServiceConfig>("lru"); +- reg.register::>, MemoryPathInfoServiceConfig>("memory"); +- reg.register::>, NixHTTPPathInfoServiceConfig>("nix"); +- reg.register::>, SledPathInfoServiceConfig>("sled"); +- #[cfg(feature = "cloud")] +- { +- reg.register::>, BigtableParameters>( +- "bigtable", +- ); +- } +-} +diff --git a/tvix/store/src/pathinfoservice/nix_http.rs b/tvix/store/src/pathinfoservice/nix_http.rs +index af9234bc0..57fe37f44 100644 +--- a/tvix/store/src/pathinfoservice/nix_http.rs ++++ b/tvix/store/src/pathinfoservice/nix_http.rs +@@ -7,15 +7,12 @@ use nix_compat::{ + nixhash::NixHash, + }; + use reqwest::StatusCode; +-use std::sync::Arc; + use tokio::io::{self, AsyncRead}; + use tonic::async_trait; + use tracing::{debug, instrument, warn}; +-use tvix_castore::composition::{CompositionContext, ServiceBuilder}; + use tvix_castore::{ + blobservice::BlobService, directoryservice::DirectoryService, proto as castorepb, Error, + }; +-use url::Url; + + /// NixHTTPPathInfoService acts as a bridge in between the Nix HTTP Binary cache + /// protocol provided by Nix binary caches such as cache.nixos.org, and the Tvix +@@ -252,71 +249,3 @@ where + })) + } + } +- +-#[derive(serde::Deserialize)] +-pub struct NixHTTPPathInfoServiceConfig { +- base_url: String, +- blob_service: String, +- directory_service: String, +- #[serde(default)] +- /// An optional list of [narinfo::PubKey]. +- /// If set, the .narinfo files received need to have correct signature by at least one of these. +- public_keys: Option>, +-} +- +-impl TryFrom for NixHTTPPathInfoServiceConfig { +- type Error = Box; +- fn try_from(url: Url) -> Result { +- let mut public_keys: Option> = None; +- for (_, v) in url +- .query_pairs() +- .into_iter() +- .filter(|(k, _)| k == "trusted-public-keys") +- { +- public_keys +- .get_or_insert(Default::default()) +- .extend(v.split_ascii_whitespace().map(ToString::to_string)); +- } +- Ok(NixHTTPPathInfoServiceConfig { +- // Stringify the URL and remove the nix+ prefix. +- // We can't use `url.set_scheme(rest)`, as it disallows +- // setting something http(s) that previously wasn't. +- base_url: url.to_string().strip_prefix("nix+").unwrap().to_string(), +- blob_service: "default".to_string(), +- directory_service: "default".to_string(), +- public_keys, +- }) +- } +-} +- +-#[async_trait] +-impl ServiceBuilder for NixHTTPPathInfoServiceConfig { +- type Output = dyn PathInfoService; +- async fn build<'a>( +- &'a self, +- _instance_name: &str, +- context: &CompositionContext, +- ) -> Result, Box> { +- let (blob_service, directory_service) = futures::join!( +- context.resolve(self.blob_service.clone()), +- context.resolve(self.directory_service.clone()) +- ); +- let mut svc = NixHTTPPathInfoService::new( +- Url::parse(&self.base_url)?, +- blob_service?, +- directory_service?, +- ); +- if let Some(public_keys) = &self.public_keys { +- svc.set_public_keys( +- public_keys +- .iter() +- .map(|pubkey_str| { +- narinfo::PubKey::parse(pubkey_str) +- .map_err(|e| Error::StorageError(format!("invalid public key: {e}"))) +- }) +- .collect::, Error>>()?, +- ); +- } +- Ok(Arc::new(svc)) +- } +-} +diff --git a/tvix/store/src/pathinfoservice/sled.rs b/tvix/store/src/pathinfoservice/sled.rs +index 4255bfd1d..96ade1816 100644 +--- a/tvix/store/src/pathinfoservice/sled.rs ++++ b/tvix/store/src/pathinfoservice/sled.rs +@@ -5,10 +5,8 @@ use futures::stream::BoxStream; + use nix_compat::nixbase32; + use prost::Message; + use std::path::Path; +-use std::sync::Arc; + use tonic::async_trait; + use tracing::{instrument, warn}; +-use tvix_castore::composition::{CompositionContext, ServiceBuilder}; + use tvix_castore::Error; + + /// SledPathInfoService stores PathInfo in a [sled](https://github.com/spacejam/sled). +@@ -116,69 +114,3 @@ impl PathInfoService for SledPathInfoService { + }) + } + } +- +-#[derive(serde::Deserialize)] +-#[serde(deny_unknown_fields)] +-pub struct SledPathInfoServiceConfig { +- is_temporary: bool, +- #[serde(default)] +- /// required when is_temporary = false +- path: Option, +-} +- +-impl TryFrom for SledPathInfoServiceConfig { +- type Error = Box; +- fn try_from(url: url::Url) -> Result { +- // sled doesn't support host, and a path can be provided (otherwise +- // it'll live in memory only). +- if url.has_host() { +- return Err(Error::StorageError("no host allowed".to_string()).into()); +- } +- +- // TODO: expose compression and other parameters as URL parameters? +- +- Ok(if url.path().is_empty() { +- SledPathInfoServiceConfig { +- is_temporary: true, +- path: None, +- } +- } else { +- SledPathInfoServiceConfig { +- is_temporary: false, +- path: Some(url.path().to_string()), +- } +- }) +- } +-} +- +-#[async_trait] +-impl ServiceBuilder for SledPathInfoServiceConfig { +- type Output = dyn PathInfoService; +- async fn build<'a>( +- &'a self, +- _instance_name: &str, +- _context: &CompositionContext, +- ) -> Result, Box> { +- match self { +- SledPathInfoServiceConfig { +- is_temporary: true, +- path: None, +- } => Ok(Arc::new(SledPathInfoService::new_temporary()?)), +- SledPathInfoServiceConfig { +- is_temporary: true, +- path: Some(_), +- } => Err( +- Error::StorageError("Temporary SledPathInfoService can not have path".into()) +- .into(), +- ), +- SledPathInfoServiceConfig { +- is_temporary: false, +- path: None, +- } => Err(Error::StorageError("SledPathInfoService is missing path".into()).into()), +- SledPathInfoServiceConfig { +- is_temporary: false, +- path: Some(path), +- } => Ok(Arc::new(SledPathInfoService::new(path)?)), +- } +- } +-} +diff --git a/tvix/store/src/utils.rs b/tvix/store/src/utils.rs +index a09786386..d82f2214f 100644 +--- a/tvix/store/src/utils.rs ++++ b/tvix/store/src/utils.rs +@@ -1,60 +1,18 @@ ++use std::sync::Arc; + use std::{ +- collections::HashMap, + pin::Pin, +- sync::Arc, + task::{self, Poll}, + }; + use tokio::io::{self, AsyncWrite}; + +-use tvix_castore::{blobservice::BlobService, directoryservice::DirectoryService}; ++use tvix_castore::{ ++ blobservice::{self, BlobService}, ++ directoryservice::{self, DirectoryService}, ++}; + use url::Url; + +-use crate::composition::{ +- with_registry, Composition, DeserializeWithRegistry, ServiceBuilder, REG, +-}; + use crate::nar::{NarCalculationService, SimpleRenderer}; +-use crate::pathinfoservice::PathInfoService; +- +-#[derive(serde::Deserialize, Default)] +-pub struct CompositionConfigs { +- pub blobservices: +- HashMap>>>, +- pub directoryservices: HashMap< +- String, +- DeserializeWithRegistry>>, +- >, +- pub pathinfoservices: HashMap< +- String, +- DeserializeWithRegistry>>, +- >, +-} +- +-pub fn addrs_to_configs( +- blob_service_addr: impl AsRef, +- directory_service_addr: impl AsRef, +- path_info_service_addr: impl AsRef, +-) -> Result> { +- let mut configs: CompositionConfigs = Default::default(); +- +- let blob_service_url = Url::parse(blob_service_addr.as_ref())?; +- let directory_service_url = Url::parse(directory_service_addr.as_ref())?; +- let path_info_service_url = Url::parse(path_info_service_addr.as_ref())?; +- +- configs.blobservices.insert( +- "default".into(), +- with_registry(®, || blob_service_url.try_into())?, +- ); +- configs.directoryservices.insert( +- "default".into(), +- with_registry(®, || directory_service_url.try_into())?, +- ); +- configs.pathinfoservices.insert( +- "default".into(), +- with_registry(®, || path_info_service_url.try_into())?, +- ); +- +- Ok(configs) +-} ++use crate::pathinfoservice::{self, PathInfoService}; + + /// Construct the store handles from their addrs. + pub async fn construct_services( +@@ -65,52 +23,49 @@ pub async fn construct_services( + ( + Arc, + Arc, +- Arc, ++ Box, + Box, + ), + Box, + > { +- let configs = addrs_to_configs( +- blob_service_addr, +- directory_service_addr, +- path_info_service_addr, +- )?; +- construct_services_from_configs(configs).await +-} +- +-/// Construct the store handles from their addrs. +-pub async fn construct_services_from_configs( +- configs: CompositionConfigs, +-) -> Result< +- ( +- Arc, +- Arc, +- Arc, +- Box, +- ), +- Box, +-> { +- let mut comp = Composition::default(); +- +- comp.extend(configs.blobservices); +- comp.extend(configs.directoryservices); +- comp.extend(configs.pathinfoservices); +- +- let blob_service: Arc = comp.build("default").await?; +- let directory_service: Arc = comp.build("default").await?; +- let path_info_service: Arc = comp.build("default").await?; ++ let blob_service: Arc = ++ blobservice::from_addr(blob_service_addr.as_ref()).await?; ++ let directory_service: Arc = ++ directoryservice::from_addr(directory_service_addr.as_ref()).await?; ++ ++ let path_info_service = pathinfoservice::from_addr( ++ path_info_service_addr.as_ref(), ++ blob_service.clone(), ++ directory_service.clone(), ++ ) ++ .await?; + + // HACK: The grpc client also implements NarCalculationService, and we + // really want to use it (otherwise we'd need to fetch everything again for hashing). + // Until we revamped store composition and config, detect this special case here. +- let nar_calculation_service: Box = path_info_service +- .nar_calculation_service() +- .unwrap_or_else(|| { ++ let nar_calculation_service: Box = { ++ use crate::pathinfoservice::GRPCPathInfoService; ++ use crate::proto::path_info_service_client::PathInfoServiceClient; ++ ++ let url = Url::parse(path_info_service_addr.as_ref()) ++ .map_err(|e| io::Error::other(e.to_string()))?; ++ ++ if url.scheme().starts_with("grpc+") { ++ Box::new(GRPCPathInfoService::from_client( ++ PathInfoServiceClient::with_interceptor( ++ tvix_castore::tonic::channel_from_url(&url) ++ .await ++ .map_err(|e| io::Error::other(e.to_string()))?, ++ tvix_tracing::propagate::tonic::send_trace, ++ ), ++ )) ++ } else { + Box::new(SimpleRenderer::new( + blob_service.clone(), + directory_service.clone(), +- )) +- }); ++ )) as Box ++ } ++ }; + + Ok(( + blob_service, diff --git a/pkgs/tvix/Cargo.lock b/pkgs/tvix/Cargo.lock index 4b73146..08b0666 100644 --- a/pkgs/tvix/Cargo.lock +++ b/pkgs/tvix/Cargo.lock @@ -2262,8 +2262,6 @@ dependencies = [ "tokio-util", "tonic", "tonic-build", - "tower", - "tower-http 0.5.2", "tracing", "tracing-subscriber", "tvix-castore", @@ -4352,23 +4350,6 @@ dependencies = [ "tracing", ] -[[package]] -name = "tower-http" -version = "0.5.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1e9cd434a998747dd2c4276bc96ee2e0c7a2eadf3cae88e52be55a05fa9053f5" -dependencies = [ - "bitflags 2.4.2", - "bytes", - "http 1.1.0", - "http-body 1.0.0", - "http-body-util", - "pin-project-lite", - "tower-layer", - "tower-service", - "tracing", -] - [[package]] name = "tower-layer" version = "0.3.2" @@ -4774,7 +4755,7 @@ dependencies = [ "tonic-build", "tonic-reflection", "tower", - "tower-http 0.4.4", + "tower-http", "tracing", "tracing-indicatif", "tvix-castore", @@ -4787,7 +4768,6 @@ dependencies = [ name = "tvix-tracing" version = "0.1.0" dependencies = [ - "axum 0.7.5", "http 0.2.11", "indicatif", "lazy_static", diff --git a/pkgs/tvix/default.nix b/pkgs/tvix/default.nix index e52834e..31ece02 100644 --- a/pkgs/tvix/default.nix +++ b/pkgs/tvix/default.nix @@ -12,7 +12,8 @@ rustPlatform.buildRustPackage rec { src = tvix-src; - sourceRoot = "${src.name}/tvix"; + patches = [ ./01-remote-path-info-nar-bridge.patch ]; + postPatch = "cd tvix"; doCheck = false;