From bb40996c4b287fb5f1c20efcd086053e3b5e91c6 Mon Sep 17 00:00:00 2001 From: Yureka Date: Wed, 24 Jul 2024 10:14:15 +0200 Subject: [PATCH] feat(pkgs/tvix): update and apply patch to canon --- .../tvix/01-remote-path-info-nar-bridge.patch | 1263 ----------------- pkgs/tvix/default.nix | 15 +- 2 files changed, 11 insertions(+), 1267 deletions(-) delete mode 100644 pkgs/tvix/01-remote-path-info-nar-bridge.patch diff --git a/pkgs/tvix/01-remote-path-info-nar-bridge.patch b/pkgs/tvix/01-remote-path-info-nar-bridge.patch deleted file mode 100644 index d49b104..0000000 --- a/pkgs/tvix/01-remote-path-info-nar-bridge.patch +++ /dev/null @@ -1,1263 +0,0 @@ -diff --git a/tvix/castore/src/composition.rs b/tvix/castore/src/composition.rs -index daa8a5008..18a767284 100644 ---- a/tvix/castore/src/composition.rs -+++ b/tvix/castore/src/composition.rs -@@ -454,11 +454,4 @@ impl Composition { - *entry = Box::new(new_val); - ret - } -- -- pub fn context(&self) -> CompositionContext { -- CompositionContext { -- stack: vec![], -- composition: Some(self), -- } -- } - } -diff --git a/tvix/cli/src/lib.rs b/tvix/cli/src/lib.rs -index 070e88e35..800ffb4e0 100644 ---- a/tvix/cli/src/lib.rs -+++ b/tvix/cli/src/lib.rs -@@ -60,7 +60,7 @@ pub fn init_io_handle(tokio_runtime: &tokio::runtime::Runtime, args: &Args) -> R - Rc::new(TvixStoreIO::new( - blob_service.clone(), - directory_service.clone(), -- path_info_service, -+ path_info_service.into(), - nar_calculation_service.into(), - build_service.into(), - tokio_runtime.handle().clone(), -diff --git a/tvix/glue/benches/eval.rs b/tvix/glue/benches/eval.rs -index ee3d554dc..55f37f2d5 100644 ---- a/tvix/glue/benches/eval.rs -+++ b/tvix/glue/benches/eval.rs -@@ -34,7 +34,7 @@ fn interpret(code: &str) { - let tvix_store_io = Rc::new(TvixStoreIO::new( - blob_service, - directory_service, -- path_info_service, -+ path_info_service.into(), - nar_calculation_service.into(), - Arc::::default(), - TOKIO_RUNTIME.handle().clone(), -diff --git a/tvix/glue/src/builtins/mod.rs b/tvix/glue/src/builtins/mod.rs -index e1d1c9c84..e1c20e461 100644 ---- a/tvix/glue/src/builtins/mod.rs -+++ b/tvix/glue/src/builtins/mod.rs -@@ -80,7 +80,7 @@ mod tests { - let io = Rc::new(TvixStoreIO::new( - blob_service, - directory_service, -- path_info_service, -+ path_info_service.into(), - nar_calculation_service.into(), - Arc::::default(), - runtime.handle().clone(), -diff --git a/tvix/glue/src/tests/mod.rs b/tvix/glue/src/tests/mod.rs -index e9f21c329..142ae1a07 100644 ---- a/tvix/glue/src/tests/mod.rs -+++ b/tvix/glue/src/tests/mod.rs -@@ -41,7 +41,7 @@ fn eval_test(code_path: PathBuf, expect_success: bool) { - let tvix_store_io = Rc::new(TvixStoreIO::new( - blob_service, - directory_service, -- path_info_service, -+ path_info_service.into(), - nar_calculation_service.into(), - Arc::new(DummyBuildService::default()), - tokio_runtime.handle().clone(), -diff --git a/tvix/glue/src/tvix_store_io.rs b/tvix/glue/src/tvix_store_io.rs -index cfc037e8d..e8c09af0e 100644 ---- a/tvix/glue/src/tvix_store_io.rs -+++ b/tvix/glue/src/tvix_store_io.rs -@@ -648,7 +648,7 @@ mod tests { - let io = Rc::new(TvixStoreIO::new( - blob_service, - directory_service, -- path_info_service, -+ path_info_service.into(), - nar_calculation_service.into(), - Arc::::default(), - tokio_runtime.handle().clone(), -diff --git a/tvix/nar-bridge/src/bin/nar-bridge.rs b/tvix/nar-bridge/src/bin/nar-bridge.rs -index 1e4223775..b24f155a7 100644 ---- a/tvix/nar-bridge/src/bin/nar-bridge.rs -+++ b/tvix/nar-bridge/src/bin/nar-bridge.rs -@@ -1,6 +1,7 @@ - use clap::Parser; - use nar_bridge::AppState; - use tracing::info; -+use tvix_store::pathinfoservice::{CachePathInfoService, PathInfoService}; - - /// Expose the Nix HTTP Binary Cache protocol for a tvix-store. - #[derive(Parser)] -@@ -15,6 +16,12 @@ struct Cli { - #[arg(long, env, default_value = "grpc+http://[::1]:8000")] - path_info_service_addr: String, - -+ /// URL to a PathInfoService that's considered "remote". -+ /// If set, the other one is considered "local", and a "cache" for the -+ /// "remote" one. -+ #[arg(long, env)] -+ remote_path_info_service_addr: Option, -+ - /// The priority to announce at the `nix-cache-info` endpoint. - /// A lower number means it's *more preferred. - #[arg(long, env, default_value_t = 39)] -@@ -55,7 +62,25 @@ async fn main() -> Result<(), Box> { - ) - .await?; - -- let state = AppState::new(blob_service, directory_service, path_info_service); -+ // if remote_path_info_service_addr has been specified, -+ // update path_info_service to point to a cache combining the two. -+ let path_info_service = if let Some(addr) = cli.remote_path_info_service_addr { -+ let remote_path_info_service = tvix_store::pathinfoservice::from_addr( -+ &addr, -+ blob_service.clone(), -+ directory_service.clone(), -+ ) -+ .await?; -+ -+ let path_info_service = -+ CachePathInfoService::new(path_info_service, remote_path_info_service); -+ -+ Box::new(path_info_service) as Box -+ } else { -+ path_info_service -+ }; -+ -+ let state = AppState::new(blob_service, directory_service, path_info_service.into()); - - let app = nar_bridge::gen_router(cli.priority).with_state(state); - -diff --git a/tvix/store/src/bin/tvix-store.rs b/tvix/store/src/bin/tvix-store.rs -index 99323d2a5..82c73f8f4 100644 ---- a/tvix/store/src/bin/tvix-store.rs -+++ b/tvix/store/src/bin/tvix-store.rs -@@ -16,6 +16,7 @@ use tracing::{info, info_span, instrument, Level, Span}; - use tracing_indicatif::span_ext::IndicatifSpanExt as _; - use tvix_castore::import::fs::ingest_path; - use tvix_store::nar::NarCalculationService; -+use tvix_store::pathinfoservice::CachePathInfoService; - use tvix_store::proto::NarInfo; - use tvix_store::proto::PathInfo; - -@@ -209,38 +210,31 @@ async fn run_cli(cli: Cli) -> Result<(), Box { - // initialize stores -- let mut configs = tvix_store::utils::addrs_to_configs( -- blob_service_addr, -- directory_service_addr, -- path_info_service_addr, -- )?; -+ let (blob_service, directory_service, path_info_service, nar_calculation_service) = -+ tvix_store::utils::construct_services( -+ blob_service_addr, -+ directory_service_addr, -+ path_info_service_addr, -+ ) -+ .await?; - - // if remote_path_info_service_addr has been specified, - // update path_info_service to point to a cache combining the two. -- if let Some(addr) = remote_path_info_service_addr { -- use tvix_store::composition::{with_registry, DeserializeWithRegistry, REG}; -- use tvix_store::pathinfoservice::CachePathInfoServiceConfig; -- -- let remote_url = url::Url::parse(&addr)?; -- let remote_config = with_registry(®, || remote_url.try_into())?; -- -- let local = configs.pathinfoservices.insert( -- "default".into(), -- DeserializeWithRegistry(Box::new(CachePathInfoServiceConfig { -- near: "local".into(), -- far: "remote".into(), -- })), -- ); -- configs -- .pathinfoservices -- .insert("local".into(), local.unwrap()); -- configs -- .pathinfoservices -- .insert("remote".into(), remote_config); -- } -+ let path_info_service = if let Some(addr) = remote_path_info_service_addr { -+ let remote_path_info_service = tvix_store::pathinfoservice::from_addr( -+ &addr, -+ blob_service.clone(), -+ directory_service.clone(), -+ ) -+ .await?; - -- let (blob_service, directory_service, path_info_service, nar_calculation_service) = -- tvix_store::utils::construct_services_from_configs(configs).await?; -+ let path_info_service = -+ CachePathInfoService::new(path_info_service, remote_path_info_service); -+ -+ Box::new(path_info_service) as Box -+ } else { -+ path_info_service -+ }; - - let mut server = Server::builder().layer( - ServiceBuilder::new() -@@ -263,7 +257,7 @@ async fn run_cli(cli: Cli) -> Result<(), Box Result<(), Box = path_info_service.into(); - let nar_calculation_service: Arc = - nar_calculation_service.into(); - -@@ -370,6 +365,9 @@ async fn run_cli(cli: Cli) -> Result<(), Box = - serde_json::from_slice(reference_graph_json.as_slice())?; - -+ // Arc the PathInfoService, as we clone it . -+ let path_info_service: Arc = path_info_service.into(); -+ - let lookups_span = info_span!( - "lookup pathinfos", - "indicatif.pb_show" = tracing::field::Empty -@@ -477,7 +475,7 @@ async fn run_cli(cli: Cli) -> Result<(), Box Result<(), Box, - } - -+/// Represents configuration of [BigtablePathInfoService]. -+/// This currently conflates both connect parameters and data model/client -+/// behaviour parameters. -+#[serde_as] -+#[derive(Clone, Debug, PartialEq, Deserialize, Serialize)] -+pub struct BigtableParameters { -+ project_id: String, -+ instance_name: String, -+ #[serde(default)] -+ is_read_only: bool, -+ #[serde(default = "default_channel_size")] -+ channel_size: usize, -+ -+ #[serde_as(as = "Option>")] -+ #[serde(default = "default_timeout")] -+ timeout: Option, -+ table_name: String, -+ family_name: String, -+ -+ #[serde(default = "default_app_profile_id")] -+ app_profile_id: String, -+} -+ -+impl BigtableParameters { -+ #[cfg(test)] -+ pub fn default_for_tests() -> Self { -+ Self { -+ project_id: "project-1".into(), -+ instance_name: "instance-1".into(), -+ is_read_only: false, -+ channel_size: default_channel_size(), -+ timeout: default_timeout(), -+ table_name: "table-1".into(), -+ family_name: "cf1".into(), -+ app_profile_id: default_app_profile_id(), -+ } -+ } -+} -+ -+fn default_app_profile_id() -> String { -+ "default".to_owned() -+} -+ -+fn default_channel_size() -> usize { -+ 4 -+} -+ -+fn default_timeout() -> Option { -+ Some(std::time::Duration::from_secs(4)) -+} -+ - impl BigtablePathInfoService { - #[cfg(not(test))] - pub async fn connect(params: BigtableParameters) -> Result { -@@ -363,88 +412,3 @@ impl PathInfoService for BigtablePathInfoService { - Box::pin(stream) - } - } -- --/// Represents configuration of [BigtablePathInfoService]. --/// This currently conflates both connect parameters and data model/client --/// behaviour parameters. --#[serde_as] --#[derive(Clone, Debug, PartialEq, Deserialize, Serialize)] --pub struct BigtableParameters { -- project_id: String, -- instance_name: String, -- #[serde(default)] -- is_read_only: bool, -- #[serde(default = "default_channel_size")] -- channel_size: usize, -- -- #[serde_as(as = "Option>")] -- #[serde(default = "default_timeout")] -- timeout: Option, -- table_name: String, -- family_name: String, -- -- #[serde(default = "default_app_profile_id")] -- app_profile_id: String, --} -- --impl BigtableParameters { -- #[cfg(test)] -- pub fn default_for_tests() -> Self { -- Self { -- project_id: "project-1".into(), -- instance_name: "instance-1".into(), -- is_read_only: false, -- channel_size: default_channel_size(), -- timeout: default_timeout(), -- table_name: "table-1".into(), -- family_name: "cf1".into(), -- app_profile_id: default_app_profile_id(), -- } -- } --} -- --fn default_app_profile_id() -> String { -- "default".to_owned() --} -- --fn default_channel_size() -> usize { -- 4 --} -- --fn default_timeout() -> Option { -- Some(std::time::Duration::from_secs(4)) --} -- --#[async_trait] --impl ServiceBuilder for BigtableParameters { -- type Output = dyn PathInfoService; -- async fn build<'a>( -- &'a self, -- _instance_name: &str, -- _context: &CompositionContext, -- ) -> Result, Box> { -- Ok(Arc::new( -- BigtablePathInfoService::connect(self.clone()).await?, -- )) -- } --} -- --impl TryFrom for BigtableParameters { -- type Error = Box; -- fn try_from(mut url: url::Url) -> Result { -- // parse the instance name from the hostname. -- let instance_name = url -- .host_str() -- .ok_or_else(|| Error::StorageError("instance name missing".into()))? -- .to_string(); -- -- // … but add it to the query string now, so we just need to parse that. -- url.query_pairs_mut() -- .append_pair("instance_name", &instance_name); -- -- let params: BigtableParameters = serde_qs::from_str(url.query().unwrap_or_default()) -- .map_err(|e| Error::InvalidRequest(format!("failed to parse parameters: {}", e)))?; -- -- Ok(params) -- } --} -diff --git a/tvix/store/src/pathinfoservice/combinators.rs b/tvix/store/src/pathinfoservice/combinators.rs -index bb5595f72..664144ef4 100644 ---- a/tvix/store/src/pathinfoservice/combinators.rs -+++ b/tvix/store/src/pathinfoservice/combinators.rs -@@ -1,11 +1,8 @@ --use std::sync::Arc; -- - use crate::proto::PathInfo; - use futures::stream::BoxStream; - use nix_compat::nixbase32; - use tonic::async_trait; - use tracing::{debug, instrument}; --use tvix_castore::composition::{CompositionContext, ServiceBuilder}; - use tvix_castore::Error; - - use super::PathInfoService; -@@ -64,41 +61,6 @@ where - } - } - --#[derive(serde::Deserialize)] --pub struct CacheConfig { -- pub near: String, -- pub far: String, --} -- --impl TryFrom for CacheConfig { -- type Error = Box; -- fn try_from(_url: url::Url) -> Result { -- Err(Error::StorageError( -- "Instantiating a CombinedPathInfoService from a url is not supported".into(), -- ) -- .into()) -- } --} -- --#[async_trait] --impl ServiceBuilder for CacheConfig { -- type Output = dyn PathInfoService; -- async fn build<'a>( -- &'a self, -- _instance_name: &str, -- context: &CompositionContext, -- ) -> Result, Box> { -- let (near, far) = futures::join!( -- context.resolve(self.near.clone()), -- context.resolve(self.far.clone()) -- ); -- Ok(Arc::new(Cache { -- near: near?, -- far: far?, -- })) -- } --} -- - #[cfg(test)] - mod test { - use std::num::NonZeroUsize; -diff --git a/tvix/store/src/pathinfoservice/from_addr.rs b/tvix/store/src/pathinfoservice/from_addr.rs -index 5635c226c..9173d25d0 100644 ---- a/tvix/store/src/pathinfoservice/from_addr.rs -+++ b/tvix/store/src/pathinfoservice/from_addr.rs -@@ -1,10 +1,13 @@ --use super::PathInfoService; -+use crate::proto::path_info_service_client::PathInfoServiceClient; - --use crate::composition::{ -- with_registry, CompositionContext, DeserializeWithRegistry, ServiceBuilder, REG, -+use super::{ -+ GRPCPathInfoService, MemoryPathInfoService, NixHTTPPathInfoService, PathInfoService, -+ SledPathInfoService, - }; -+ -+use nix_compat::narinfo; - use std::sync::Arc; --use tvix_castore::Error; -+use tvix_castore::{blobservice::BlobService, directoryservice::DirectoryService, Error}; - use url::Url; - - /// Constructs a new instance of a [PathInfoService] from an URI. -@@ -31,21 +34,113 @@ use url::Url; - /// these also need to be passed in. - pub async fn from_addr( - uri: &str, -- context: Option<&CompositionContext<'_>>, --) -> Result, Box> { -+ blob_service: Arc, -+ directory_service: Arc, -+) -> Result, Error> { - #[allow(unused_mut)] - let mut url = - Url::parse(uri).map_err(|e| Error::StorageError(format!("unable to parse url: {}", e)))?; - -- let path_info_service_config = with_registry(®, || { -- >>>::try_from( -- url, -- ) -- })? -- .0; -- let path_info_service = path_info_service_config -- .build("anonymous", context.unwrap_or(&CompositionContext::blank())) -- .await?; -+ let path_info_service: Box = match url.scheme() { -+ "memory" => { -+ // memory doesn't support host or path in the URL. -+ if url.has_host() || !url.path().is_empty() { -+ return Err(Error::StorageError("invalid url".to_string())); -+ } -+ Box::::default() -+ } -+ "sled" => { -+ // sled doesn't support host, and a path can be provided (otherwise -+ // it'll live in memory only). -+ if url.has_host() { -+ return Err(Error::StorageError("no host allowed".to_string())); -+ } -+ -+ if url.path() == "/" { -+ return Err(Error::StorageError( -+ "cowardly refusing to open / with sled".to_string(), -+ )); -+ } -+ -+ // TODO: expose other parameters as URL parameters? -+ -+ Box::new(if url.path().is_empty() { -+ SledPathInfoService::new_temporary() -+ .map_err(|e| Error::StorageError(e.to_string()))? -+ } else { -+ SledPathInfoService::new(url.path()) -+ .map_err(|e| Error::StorageError(e.to_string()))? -+ }) -+ } -+ "nix+http" | "nix+https" => { -+ // Stringify the URL and remove the nix+ prefix. -+ // We can't use `url.set_scheme(rest)`, as it disallows -+ // setting something http(s) that previously wasn't. -+ let new_url = Url::parse(url.to_string().strip_prefix("nix+").unwrap()).unwrap(); -+ -+ let mut nix_http_path_info_service = -+ NixHTTPPathInfoService::new(new_url, blob_service, directory_service); -+ -+ let pairs = &url.query_pairs(); -+ for (k, v) in pairs.into_iter() { -+ if k == "trusted-public-keys" { -+ let pubkey_strs: Vec<_> = v.split_ascii_whitespace().collect(); -+ -+ let mut pubkeys: Vec = Vec::with_capacity(pubkey_strs.len()); -+ for pubkey_str in pubkey_strs { -+ pubkeys.push(narinfo::PubKey::parse(pubkey_str).map_err(|e| { -+ Error::StorageError(format!("invalid public key: {e}")) -+ })?); -+ } -+ -+ nix_http_path_info_service.set_public_keys(pubkeys); -+ } -+ } -+ -+ Box::new(nix_http_path_info_service) -+ } -+ scheme if scheme.starts_with("grpc+") => { -+ // schemes starting with grpc+ go to the GRPCPathInfoService. -+ // That's normally grpc+unix for unix sockets, and grpc+http(s) for the HTTP counterparts. -+ // - In the case of unix sockets, there must be a path, but may not be a host. -+ // - In the case of non-unix sockets, there must be a host, but no path. -+ // Constructing the channel is handled by tvix_castore::channel::from_url. -+ Box::new(GRPCPathInfoService::from_client( -+ PathInfoServiceClient::with_interceptor( -+ tvix_castore::tonic::channel_from_url(&url).await?, -+ tvix_tracing::propagate::tonic::send_trace, -+ ), -+ )) -+ } -+ #[cfg(feature = "cloud")] -+ "bigtable" => { -+ use super::bigtable::BigtableParameters; -+ use super::BigtablePathInfoService; -+ -+ // parse the instance name from the hostname. -+ let instance_name = url -+ .host_str() -+ .ok_or_else(|| Error::StorageError("instance name missing".into()))? -+ .to_string(); -+ -+ // … but add it to the query string now, so we just need to parse that. -+ url.query_pairs_mut() -+ .append_pair("instance_name", &instance_name); -+ -+ let params: BigtableParameters = serde_qs::from_str(url.query().unwrap_or_default()) -+ .map_err(|e| Error::InvalidRequest(format!("failed to parse parameters: {}", e)))?; -+ -+ Box::new( -+ BigtablePathInfoService::connect(params) -+ .await -+ .map_err(|e| Error::StorageError(e.to_string()))?, -+ ) -+ } -+ _ => Err(Error::StorageError(format!( -+ "unknown scheme: {}", -+ url.scheme() -+ )))?, -+ }; - - Ok(path_info_service) - } -@@ -53,12 +148,14 @@ pub async fn from_addr( - #[cfg(test)] - mod tests { - use super::from_addr; -- use crate::composition::{Composition, DeserializeWithRegistry, ServiceBuilder}; - use lazy_static::lazy_static; - use rstest::rstest; -+ use std::sync::Arc; - use tempfile::TempDir; -- use tvix_castore::blobservice::{BlobService, MemoryBlobServiceConfig}; -- use tvix_castore::directoryservice::{DirectoryService, MemoryDirectoryServiceConfig}; -+ use tvix_castore::{ -+ blobservice::{BlobService, MemoryBlobService}, -+ directoryservice::{DirectoryService, MemoryDirectoryService}, -+ }; - - lazy_static! { - static ref TMPDIR_SLED_1: TempDir = TempDir::new().unwrap(); -@@ -127,19 +224,11 @@ mod tests { - )] - #[tokio::test] - async fn test_from_addr_tokio(#[case] uri_str: &str, #[case] exp_succeed: bool) { -- let mut comp = Composition::default(); -- comp.extend(vec![( -- "default".into(), -- DeserializeWithRegistry(Box::new(MemoryBlobServiceConfig {}) -- as Box>), -- )]); -- comp.extend(vec![( -- "default".into(), -- DeserializeWithRegistry(Box::new(MemoryDirectoryServiceConfig {}) -- as Box>), -- )]); -- -- let resp = from_addr(uri_str, Some(&comp.context())).await; -+ let blob_service: Arc = Arc::from(MemoryBlobService::default()); -+ let directory_service: Arc = -+ Arc::from(MemoryDirectoryService::default()); -+ -+ let resp = from_addr(uri_str, blob_service, directory_service).await; - - if exp_succeed { - resp.expect("should succeed"); -diff --git a/tvix/store/src/pathinfoservice/grpc.rs b/tvix/store/src/pathinfoservice/grpc.rs -index 2ac0e4330..bcee49aac 100644 ---- a/tvix/store/src/pathinfoservice/grpc.rs -+++ b/tvix/store/src/pathinfoservice/grpc.rs -@@ -6,11 +6,9 @@ use crate::{ - use async_stream::try_stream; - use futures::stream::BoxStream; - use nix_compat::nixbase32; --use std::sync::Arc; - use tonic::{async_trait, Code}; - use tracing::{instrument, Span}; - use tracing_indicatif::span_ext::IndicatifSpanExt; --use tvix_castore::composition::{CompositionContext, ServiceBuilder}; - use tvix_castore::{proto as castorepb, Error}; - - /// Connects to a (remote) tvix-store PathInfoService over gRPC. -@@ -151,40 +149,6 @@ where - } - } - --#[derive(serde::Deserialize, Debug)] --#[serde(deny_unknown_fields)] --pub struct GRPCPathInfoServiceConfig { -- url: String, --} -- --impl TryFrom for GRPCPathInfoServiceConfig { -- type Error = Box; -- fn try_from(url: url::Url) -> Result { -- // normally grpc+unix for unix sockets, and grpc+http(s) for the HTTP counterparts. -- // - In the case of unix sockets, there must be a path, but may not be a host. -- // - In the case of non-unix sockets, there must be a host, but no path. -- // Constructing the channel is handled by tvix_castore::channel::from_url. -- Ok(GRPCPathInfoServiceConfig { -- url: url.to_string(), -- }) -- } --} -- --#[async_trait] --impl ServiceBuilder for GRPCPathInfoServiceConfig { -- type Output = dyn PathInfoService; -- async fn build<'a>( -- &'a self, -- _instance_name: &str, -- _context: &CompositionContext, -- ) -> Result, Box> { -- let client = proto::path_info_service_client::PathInfoServiceClient::new( -- tvix_castore::tonic::channel_from_url(&self.url.parse()?).await?, -- ); -- Ok(Arc::new(GRPCPathInfoService::from_client(client))) -- } --} -- - #[cfg(test)] - mod tests { - use crate::pathinfoservice::tests::make_grpc_path_info_service_client; -diff --git a/tvix/store/src/pathinfoservice/lru.rs b/tvix/store/src/pathinfoservice/lru.rs -index 39c592bc9..da674f497 100644 ---- a/tvix/store/src/pathinfoservice/lru.rs -+++ b/tvix/store/src/pathinfoservice/lru.rs -@@ -9,7 +9,6 @@ use tonic::async_trait; - use tracing::instrument; - - use crate::proto::PathInfo; --use tvix_castore::composition::{CompositionContext, ServiceBuilder}; - use tvix_castore::Error; - - use super::PathInfoService; -@@ -61,34 +60,6 @@ impl PathInfoService for LruPathInfoService { - } - } - --#[derive(serde::Deserialize, Debug)] --#[serde(deny_unknown_fields)] --pub struct LruPathInfoServiceConfig { -- capacity: NonZeroUsize, --} -- --impl TryFrom for LruPathInfoServiceConfig { -- type Error = Box; -- fn try_from(_url: url::Url) -> Result { -- Err(Error::StorageError( -- "Instantiating a LruPathInfoService from a url is not supported".into(), -- ) -- .into()) -- } --} -- --#[async_trait] --impl ServiceBuilder for LruPathInfoServiceConfig { -- type Output = dyn PathInfoService; -- async fn build<'a>( -- &'a self, -- _instance_name: &str, -- _context: &CompositionContext, -- ) -> Result, Box> { -- Ok(Arc::new(LruPathInfoService::with_capacity(self.capacity))) -- } --} -- - #[cfg(test)] - mod test { - use std::num::NonZeroUsize; -diff --git a/tvix/store/src/pathinfoservice/memory.rs b/tvix/store/src/pathinfoservice/memory.rs -index 3fabd239c..3de3221df 100644 ---- a/tvix/store/src/pathinfoservice/memory.rs -+++ b/tvix/store/src/pathinfoservice/memory.rs -@@ -7,7 +7,6 @@ use std::{collections::HashMap, sync::Arc}; - use tokio::sync::RwLock; - use tonic::async_trait; - use tracing::instrument; --use tvix_castore::composition::{CompositionContext, ServiceBuilder}; - use tvix_castore::Error; - - #[derive(Default)] -@@ -60,30 +59,3 @@ impl PathInfoService for MemoryPathInfoService { - }) - } - } -- --#[derive(serde::Deserialize, Debug)] --#[serde(deny_unknown_fields)] --pub struct MemoryPathInfoServiceConfig {} -- --impl TryFrom for MemoryPathInfoServiceConfig { -- type Error = Box; -- fn try_from(url: url::Url) -> Result { -- // memory doesn't support host or path in the URL. -- if url.has_host() || !url.path().is_empty() { -- return Err(Error::StorageError("invalid url".to_string()).into()); -- } -- Ok(MemoryPathInfoServiceConfig {}) -- } --} -- --#[async_trait] --impl ServiceBuilder for MemoryPathInfoServiceConfig { -- type Output = dyn PathInfoService; -- async fn build<'a>( -- &'a self, -- _instance_name: &str, -- _context: &CompositionContext, -- ) -> Result, Box> { -- Ok(Arc::new(MemoryPathInfoService::default())) -- } --} -diff --git a/tvix/store/src/pathinfoservice/mod.rs b/tvix/store/src/pathinfoservice/mod.rs -index 70f752f22..574bcc0b8 100644 ---- a/tvix/store/src/pathinfoservice/mod.rs -+++ b/tvix/store/src/pathinfoservice/mod.rs -@@ -14,26 +14,22 @@ mod tests; - - use futures::stream::BoxStream; - use tonic::async_trait; --use tvix_castore::composition::{Registry, ServiceBuilder}; - use tvix_castore::Error; - --use crate::nar::NarCalculationService; - use crate::proto::PathInfo; - --pub use self::combinators::{ -- Cache as CachePathInfoService, CacheConfig as CachePathInfoServiceConfig, --}; -+pub use self::combinators::Cache as CachePathInfoService; - pub use self::from_addr::from_addr; --pub use self::grpc::{GRPCPathInfoService, GRPCPathInfoServiceConfig}; --pub use self::lru::{LruPathInfoService, LruPathInfoServiceConfig}; --pub use self::memory::{MemoryPathInfoService, MemoryPathInfoServiceConfig}; --pub use self::nix_http::{NixHTTPPathInfoService, NixHTTPPathInfoServiceConfig}; --pub use self::sled::{SledPathInfoService, SledPathInfoServiceConfig}; -+pub use self::grpc::GRPCPathInfoService; -+pub use self::lru::LruPathInfoService; -+pub use self::memory::MemoryPathInfoService; -+pub use self::nix_http::NixHTTPPathInfoService; -+pub use self::sled::SledPathInfoService; - - #[cfg(feature = "cloud")] - mod bigtable; - #[cfg(feature = "cloud")] --pub use self::bigtable::{BigtableParameters, BigtablePathInfoService}; -+pub use self::bigtable::BigtablePathInfoService; - - #[cfg(any(feature = "fuse", feature = "virtiofs"))] - pub use self::fs::make_fs; -@@ -56,16 +52,12 @@ pub trait PathInfoService: Send + Sync { - /// Rust doesn't support this as a generic in traits yet. This is the same thing that - /// [async_trait] generates, but for streams instead of futures. - fn list(&self) -> BoxStream<'static, Result>; -- -- fn nar_calculation_service(&self) -> Option> { -- None -- } - } - - #[async_trait] - impl PathInfoService for A - where -- A: AsRef + Send + Sync + 'static, -+ A: AsRef + Send + Sync, - { - async fn get(&self, digest: [u8; 20]) -> Result, Error> { - self.as_ref().get(digest).await -@@ -79,19 +71,3 @@ where - self.as_ref().list() - } - } -- --/// Registers the builtin PathInfoService implementations with the registry --pub(crate) fn register_pathinfo_services(reg: &mut Registry) { -- reg.register::>, CachePathInfoServiceConfig>("cache"); -- reg.register::>, GRPCPathInfoServiceConfig>("grpc"); -- reg.register::>, LruPathInfoServiceConfig>("lru"); -- reg.register::>, MemoryPathInfoServiceConfig>("memory"); -- reg.register::>, NixHTTPPathInfoServiceConfig>("nix"); -- reg.register::>, SledPathInfoServiceConfig>("sled"); -- #[cfg(feature = "cloud")] -- { -- reg.register::>, BigtableParameters>( -- "bigtable", -- ); -- } --} -diff --git a/tvix/store/src/pathinfoservice/nix_http.rs b/tvix/store/src/pathinfoservice/nix_http.rs -index af9234bc0..57fe37f44 100644 ---- a/tvix/store/src/pathinfoservice/nix_http.rs -+++ b/tvix/store/src/pathinfoservice/nix_http.rs -@@ -7,15 +7,12 @@ use nix_compat::{ - nixhash::NixHash, - }; - use reqwest::StatusCode; --use std::sync::Arc; - use tokio::io::{self, AsyncRead}; - use tonic::async_trait; - use tracing::{debug, instrument, warn}; --use tvix_castore::composition::{CompositionContext, ServiceBuilder}; - use tvix_castore::{ - blobservice::BlobService, directoryservice::DirectoryService, proto as castorepb, Error, - }; --use url::Url; - - /// NixHTTPPathInfoService acts as a bridge in between the Nix HTTP Binary cache - /// protocol provided by Nix binary caches such as cache.nixos.org, and the Tvix -@@ -252,71 +249,3 @@ where - })) - } - } -- --#[derive(serde::Deserialize)] --pub struct NixHTTPPathInfoServiceConfig { -- base_url: String, -- blob_service: String, -- directory_service: String, -- #[serde(default)] -- /// An optional list of [narinfo::PubKey]. -- /// If set, the .narinfo files received need to have correct signature by at least one of these. -- public_keys: Option>, --} -- --impl TryFrom for NixHTTPPathInfoServiceConfig { -- type Error = Box; -- fn try_from(url: Url) -> Result { -- let mut public_keys: Option> = None; -- for (_, v) in url -- .query_pairs() -- .into_iter() -- .filter(|(k, _)| k == "trusted-public-keys") -- { -- public_keys -- .get_or_insert(Default::default()) -- .extend(v.split_ascii_whitespace().map(ToString::to_string)); -- } -- Ok(NixHTTPPathInfoServiceConfig { -- // Stringify the URL and remove the nix+ prefix. -- // We can't use `url.set_scheme(rest)`, as it disallows -- // setting something http(s) that previously wasn't. -- base_url: url.to_string().strip_prefix("nix+").unwrap().to_string(), -- blob_service: "default".to_string(), -- directory_service: "default".to_string(), -- public_keys, -- }) -- } --} -- --#[async_trait] --impl ServiceBuilder for NixHTTPPathInfoServiceConfig { -- type Output = dyn PathInfoService; -- async fn build<'a>( -- &'a self, -- _instance_name: &str, -- context: &CompositionContext, -- ) -> Result, Box> { -- let (blob_service, directory_service) = futures::join!( -- context.resolve(self.blob_service.clone()), -- context.resolve(self.directory_service.clone()) -- ); -- let mut svc = NixHTTPPathInfoService::new( -- Url::parse(&self.base_url)?, -- blob_service?, -- directory_service?, -- ); -- if let Some(public_keys) = &self.public_keys { -- svc.set_public_keys( -- public_keys -- .iter() -- .map(|pubkey_str| { -- narinfo::PubKey::parse(pubkey_str) -- .map_err(|e| Error::StorageError(format!("invalid public key: {e}"))) -- }) -- .collect::, Error>>()?, -- ); -- } -- Ok(Arc::new(svc)) -- } --} -diff --git a/tvix/store/src/pathinfoservice/sled.rs b/tvix/store/src/pathinfoservice/sled.rs -index 4255bfd1d..96ade1816 100644 ---- a/tvix/store/src/pathinfoservice/sled.rs -+++ b/tvix/store/src/pathinfoservice/sled.rs -@@ -5,10 +5,8 @@ use futures::stream::BoxStream; - use nix_compat::nixbase32; - use prost::Message; - use std::path::Path; --use std::sync::Arc; - use tonic::async_trait; - use tracing::{instrument, warn}; --use tvix_castore::composition::{CompositionContext, ServiceBuilder}; - use tvix_castore::Error; - - /// SledPathInfoService stores PathInfo in a [sled](https://github.com/spacejam/sled). -@@ -116,69 +114,3 @@ impl PathInfoService for SledPathInfoService { - }) - } - } -- --#[derive(serde::Deserialize)] --#[serde(deny_unknown_fields)] --pub struct SledPathInfoServiceConfig { -- is_temporary: bool, -- #[serde(default)] -- /// required when is_temporary = false -- path: Option, --} -- --impl TryFrom for SledPathInfoServiceConfig { -- type Error = Box; -- fn try_from(url: url::Url) -> Result { -- // sled doesn't support host, and a path can be provided (otherwise -- // it'll live in memory only). -- if url.has_host() { -- return Err(Error::StorageError("no host allowed".to_string()).into()); -- } -- -- // TODO: expose compression and other parameters as URL parameters? -- -- Ok(if url.path().is_empty() { -- SledPathInfoServiceConfig { -- is_temporary: true, -- path: None, -- } -- } else { -- SledPathInfoServiceConfig { -- is_temporary: false, -- path: Some(url.path().to_string()), -- } -- }) -- } --} -- --#[async_trait] --impl ServiceBuilder for SledPathInfoServiceConfig { -- type Output = dyn PathInfoService; -- async fn build<'a>( -- &'a self, -- _instance_name: &str, -- _context: &CompositionContext, -- ) -> Result, Box> { -- match self { -- SledPathInfoServiceConfig { -- is_temporary: true, -- path: None, -- } => Ok(Arc::new(SledPathInfoService::new_temporary()?)), -- SledPathInfoServiceConfig { -- is_temporary: true, -- path: Some(_), -- } => Err( -- Error::StorageError("Temporary SledPathInfoService can not have path".into()) -- .into(), -- ), -- SledPathInfoServiceConfig { -- is_temporary: false, -- path: None, -- } => Err(Error::StorageError("SledPathInfoService is missing path".into()).into()), -- SledPathInfoServiceConfig { -- is_temporary: false, -- path: Some(path), -- } => Ok(Arc::new(SledPathInfoService::new(path)?)), -- } -- } --} -diff --git a/tvix/store/src/utils.rs b/tvix/store/src/utils.rs -index a09786386..d82f2214f 100644 ---- a/tvix/store/src/utils.rs -+++ b/tvix/store/src/utils.rs -@@ -1,60 +1,18 @@ -+use std::sync::Arc; - use std::{ -- collections::HashMap, - pin::Pin, -- sync::Arc, - task::{self, Poll}, - }; - use tokio::io::{self, AsyncWrite}; - --use tvix_castore::{blobservice::BlobService, directoryservice::DirectoryService}; -+use tvix_castore::{ -+ blobservice::{self, BlobService}, -+ directoryservice::{self, DirectoryService}, -+}; - use url::Url; - --use crate::composition::{ -- with_registry, Composition, DeserializeWithRegistry, ServiceBuilder, REG, --}; - use crate::nar::{NarCalculationService, SimpleRenderer}; --use crate::pathinfoservice::PathInfoService; -- --#[derive(serde::Deserialize, Default)] --pub struct CompositionConfigs { -- pub blobservices: -- HashMap>>>, -- pub directoryservices: HashMap< -- String, -- DeserializeWithRegistry>>, -- >, -- pub pathinfoservices: HashMap< -- String, -- DeserializeWithRegistry>>, -- >, --} -- --pub fn addrs_to_configs( -- blob_service_addr: impl AsRef, -- directory_service_addr: impl AsRef, -- path_info_service_addr: impl AsRef, --) -> Result> { -- let mut configs: CompositionConfigs = Default::default(); -- -- let blob_service_url = Url::parse(blob_service_addr.as_ref())?; -- let directory_service_url = Url::parse(directory_service_addr.as_ref())?; -- let path_info_service_url = Url::parse(path_info_service_addr.as_ref())?; -- -- configs.blobservices.insert( -- "default".into(), -- with_registry(®, || blob_service_url.try_into())?, -- ); -- configs.directoryservices.insert( -- "default".into(), -- with_registry(®, || directory_service_url.try_into())?, -- ); -- configs.pathinfoservices.insert( -- "default".into(), -- with_registry(®, || path_info_service_url.try_into())?, -- ); -- -- Ok(configs) --} -+use crate::pathinfoservice::{self, PathInfoService}; - - /// Construct the store handles from their addrs. - pub async fn construct_services( -@@ -65,52 +23,49 @@ pub async fn construct_services( - ( - Arc, - Arc, -- Arc, -+ Box, - Box, - ), - Box, - > { -- let configs = addrs_to_configs( -- blob_service_addr, -- directory_service_addr, -- path_info_service_addr, -- )?; -- construct_services_from_configs(configs).await --} -- --/// Construct the store handles from their addrs. --pub async fn construct_services_from_configs( -- configs: CompositionConfigs, --) -> Result< -- ( -- Arc, -- Arc, -- Arc, -- Box, -- ), -- Box, --> { -- let mut comp = Composition::default(); -- -- comp.extend(configs.blobservices); -- comp.extend(configs.directoryservices); -- comp.extend(configs.pathinfoservices); -- -- let blob_service: Arc = comp.build("default").await?; -- let directory_service: Arc = comp.build("default").await?; -- let path_info_service: Arc = comp.build("default").await?; -+ let blob_service: Arc = -+ blobservice::from_addr(blob_service_addr.as_ref()).await?; -+ let directory_service: Arc = -+ directoryservice::from_addr(directory_service_addr.as_ref()).await?; -+ -+ let path_info_service = pathinfoservice::from_addr( -+ path_info_service_addr.as_ref(), -+ blob_service.clone(), -+ directory_service.clone(), -+ ) -+ .await?; - - // HACK: The grpc client also implements NarCalculationService, and we - // really want to use it (otherwise we'd need to fetch everything again for hashing). - // Until we revamped store composition and config, detect this special case here. -- let nar_calculation_service: Box = path_info_service -- .nar_calculation_service() -- .unwrap_or_else(|| { -+ let nar_calculation_service: Box = { -+ use crate::pathinfoservice::GRPCPathInfoService; -+ use crate::proto::path_info_service_client::PathInfoServiceClient; -+ -+ let url = Url::parse(path_info_service_addr.as_ref()) -+ .map_err(|e| io::Error::other(e.to_string()))?; -+ -+ if url.scheme().starts_with("grpc+") { -+ Box::new(GRPCPathInfoService::from_client( -+ PathInfoServiceClient::with_interceptor( -+ tvix_castore::tonic::channel_from_url(&url) -+ .await -+ .map_err(|e| io::Error::other(e.to_string()))?, -+ tvix_tracing::propagate::tonic::send_trace, -+ ), -+ )) -+ } else { - Box::new(SimpleRenderer::new( - blob_service.clone(), - directory_service.clone(), -- )) -- }); -+ )) as Box -+ } -+ }; - - Ok(( - blob_service, diff --git a/pkgs/tvix/default.nix b/pkgs/tvix/default.nix index bcab008..1a020ca 100644 --- a/pkgs/tvix/default.nix +++ b/pkgs/tvix/default.nix @@ -1,5 +1,6 @@ { fetchgit, + fetchpatch, rustPlatform, protobuf, packages ? [ ], @@ -12,12 +13,18 @@ rustPlatform.buildRustPackage rec { src = fetchgit { name = "tvix"; - url = "https://cl.tvl.fyi/depot"; - rev = "507a5c1b7376a2f9617b286f142ec8115cf19a6e"; - hash = "sha256-QTAzM8b59NtQSjvlu5mcO13t4T58Mv00ax7/RnRENwk="; + url = "https://code.tvl.fyi/depot.git"; + rev = "e97202e54a02d0717f0457fec0863e3e74fc93d8"; + hash = "sha256-V/3vSjLp28hv1vJowvy9NlBp7gV6GayHzj4iUN6dek0="; }; - patches = [ ]; + patches = [ + (fetchpatch { + url = "https://cl.tvl.fyi/changes/depot~12015/revisions/1/patch?download"; + decode = "base64 -d"; + hash = "sha256-2KHXwLp5unhC8albVT6Cimw2zZCes93lDdM5jckvxd0="; + }) + ]; postPatch = "cd tvix"; doCheck = false;