binary-cache/pkgs/tvix/01-remote-path-info-nar-bridge.patch

1264 lines
46 KiB
Diff
Raw Normal View History

diff --git a/tvix/castore/src/composition.rs b/tvix/castore/src/composition.rs
index daa8a5008..18a767284 100644
--- a/tvix/castore/src/composition.rs
+++ b/tvix/castore/src/composition.rs
@@ -454,11 +454,4 @@ impl Composition {
*entry = Box::new(new_val);
ret
}
-
- pub fn context(&self) -> CompositionContext {
- CompositionContext {
- stack: vec![],
- composition: Some(self),
- }
- }
}
diff --git a/tvix/cli/src/lib.rs b/tvix/cli/src/lib.rs
index 070e88e35..800ffb4e0 100644
--- a/tvix/cli/src/lib.rs
+++ b/tvix/cli/src/lib.rs
@@ -60,7 +60,7 @@ pub fn init_io_handle(tokio_runtime: &tokio::runtime::Runtime, args: &Args) -> R
Rc::new(TvixStoreIO::new(
blob_service.clone(),
directory_service.clone(),
- path_info_service,
+ path_info_service.into(),
nar_calculation_service.into(),
build_service.into(),
tokio_runtime.handle().clone(),
diff --git a/tvix/glue/benches/eval.rs b/tvix/glue/benches/eval.rs
index ee3d554dc..55f37f2d5 100644
--- a/tvix/glue/benches/eval.rs
+++ b/tvix/glue/benches/eval.rs
@@ -34,7 +34,7 @@ fn interpret(code: &str) {
let tvix_store_io = Rc::new(TvixStoreIO::new(
blob_service,
directory_service,
- path_info_service,
+ path_info_service.into(),
nar_calculation_service.into(),
Arc::<DummyBuildService>::default(),
TOKIO_RUNTIME.handle().clone(),
diff --git a/tvix/glue/src/builtins/mod.rs b/tvix/glue/src/builtins/mod.rs
index e1d1c9c84..e1c20e461 100644
--- a/tvix/glue/src/builtins/mod.rs
+++ b/tvix/glue/src/builtins/mod.rs
@@ -80,7 +80,7 @@ mod tests {
let io = Rc::new(TvixStoreIO::new(
blob_service,
directory_service,
- path_info_service,
+ path_info_service.into(),
nar_calculation_service.into(),
Arc::<DummyBuildService>::default(),
runtime.handle().clone(),
diff --git a/tvix/glue/src/tests/mod.rs b/tvix/glue/src/tests/mod.rs
index e9f21c329..142ae1a07 100644
--- a/tvix/glue/src/tests/mod.rs
+++ b/tvix/glue/src/tests/mod.rs
@@ -41,7 +41,7 @@ fn eval_test(code_path: PathBuf, expect_success: bool) {
let tvix_store_io = Rc::new(TvixStoreIO::new(
blob_service,
directory_service,
- path_info_service,
+ path_info_service.into(),
nar_calculation_service.into(),
Arc::new(DummyBuildService::default()),
tokio_runtime.handle().clone(),
diff --git a/tvix/glue/src/tvix_store_io.rs b/tvix/glue/src/tvix_store_io.rs
index cfc037e8d..e8c09af0e 100644
--- a/tvix/glue/src/tvix_store_io.rs
+++ b/tvix/glue/src/tvix_store_io.rs
@@ -648,7 +648,7 @@ mod tests {
let io = Rc::new(TvixStoreIO::new(
blob_service,
directory_service,
- path_info_service,
+ path_info_service.into(),
nar_calculation_service.into(),
Arc::<DummyBuildService>::default(),
tokio_runtime.handle().clone(),
diff --git a/tvix/nar-bridge/src/bin/nar-bridge.rs b/tvix/nar-bridge/src/bin/nar-bridge.rs
index 1e4223775..b24f155a7 100644
--- a/tvix/nar-bridge/src/bin/nar-bridge.rs
+++ b/tvix/nar-bridge/src/bin/nar-bridge.rs
@@ -1,6 +1,7 @@
use clap::Parser;
use nar_bridge::AppState;
use tracing::info;
+use tvix_store::pathinfoservice::{CachePathInfoService, PathInfoService};
/// Expose the Nix HTTP Binary Cache protocol for a tvix-store.
#[derive(Parser)]
@@ -15,6 +16,12 @@ struct Cli {
#[arg(long, env, default_value = "grpc+http://[::1]:8000")]
path_info_service_addr: String,
+ /// URL to a PathInfoService that's considered "remote".
+ /// If set, the other one is considered "local", and a "cache" for the
+ /// "remote" one.
+ #[arg(long, env)]
+ remote_path_info_service_addr: Option<String>,
+
/// The priority to announce at the `nix-cache-info` endpoint.
/// A lower number means it's *more preferred.
#[arg(long, env, default_value_t = 39)]
@@ -55,7 +62,25 @@ async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
)
.await?;
- let state = AppState::new(blob_service, directory_service, path_info_service);
+ // if remote_path_info_service_addr has been specified,
+ // update path_info_service to point to a cache combining the two.
+ let path_info_service = if let Some(addr) = cli.remote_path_info_service_addr {
+ let remote_path_info_service = tvix_store::pathinfoservice::from_addr(
+ &addr,
+ blob_service.clone(),
+ directory_service.clone(),
+ )
+ .await?;
+
+ let path_info_service =
+ CachePathInfoService::new(path_info_service, remote_path_info_service);
+
+ Box::new(path_info_service) as Box<dyn PathInfoService>
+ } else {
+ path_info_service
+ };
+
+ let state = AppState::new(blob_service, directory_service, path_info_service.into());
let app = nar_bridge::gen_router(cli.priority).with_state(state);
diff --git a/tvix/store/src/bin/tvix-store.rs b/tvix/store/src/bin/tvix-store.rs
index 99323d2a5..82c73f8f4 100644
--- a/tvix/store/src/bin/tvix-store.rs
+++ b/tvix/store/src/bin/tvix-store.rs
@@ -16,6 +16,7 @@ use tracing::{info, info_span, instrument, Level, Span};
use tracing_indicatif::span_ext::IndicatifSpanExt as _;
use tvix_castore::import::fs::ingest_path;
use tvix_store::nar::NarCalculationService;
+use tvix_store::pathinfoservice::CachePathInfoService;
use tvix_store::proto::NarInfo;
use tvix_store::proto::PathInfo;
@@ -209,38 +210,31 @@ async fn run_cli(cli: Cli) -> Result<(), Box<dyn std::error::Error + Send + Sync
remote_path_info_service_addr,
} => {
// initialize stores
- let mut configs = tvix_store::utils::addrs_to_configs(
- blob_service_addr,
- directory_service_addr,
- path_info_service_addr,
- )?;
+ let (blob_service, directory_service, path_info_service, nar_calculation_service) =
+ tvix_store::utils::construct_services(
+ blob_service_addr,
+ directory_service_addr,
+ path_info_service_addr,
+ )
+ .await?;
// if remote_path_info_service_addr has been specified,
// update path_info_service to point to a cache combining the two.
- if let Some(addr) = remote_path_info_service_addr {
- use tvix_store::composition::{with_registry, DeserializeWithRegistry, REG};
- use tvix_store::pathinfoservice::CachePathInfoServiceConfig;
-
- let remote_url = url::Url::parse(&addr)?;
- let remote_config = with_registry(&REG, || remote_url.try_into())?;
-
- let local = configs.pathinfoservices.insert(
- "default".into(),
- DeserializeWithRegistry(Box::new(CachePathInfoServiceConfig {
- near: "local".into(),
- far: "remote".into(),
- })),
- );
- configs
- .pathinfoservices
- .insert("local".into(), local.unwrap());
- configs
- .pathinfoservices
- .insert("remote".into(), remote_config);
- }
+ let path_info_service = if let Some(addr) = remote_path_info_service_addr {
+ let remote_path_info_service = tvix_store::pathinfoservice::from_addr(
+ &addr,
+ blob_service.clone(),
+ directory_service.clone(),
+ )
+ .await?;
- let (blob_service, directory_service, path_info_service, nar_calculation_service) =
- tvix_store::utils::construct_services_from_configs(configs).await?;
+ let path_info_service =
+ CachePathInfoService::new(path_info_service, remote_path_info_service);
+
+ Box::new(path_info_service) as Box<dyn PathInfoService>
+ } else {
+ path_info_service
+ };
let mut server = Server::builder().layer(
ServiceBuilder::new()
@@ -263,7 +257,7 @@ async fn run_cli(cli: Cli) -> Result<(), Box<dyn std::error::Error + Send + Sync
GRPCDirectoryServiceWrapper::new(directory_service),
))
.add_service(PathInfoServiceServer::new(GRPCPathInfoServiceWrapper::new(
- path_info_service,
+ Arc::from(path_info_service),
nar_calculation_service,
)));
@@ -308,7 +302,8 @@ async fn run_cli(cli: Cli) -> Result<(), Box<dyn std::error::Error + Send + Sync
)
.await?;
- // Arc NarCalculationService, as we clone it .
+ // Arc PathInfoService and NarCalculationService, as we clone it .
+ let path_info_service: Arc<dyn PathInfoService> = path_info_service.into();
let nar_calculation_service: Arc<dyn NarCalculationService> =
nar_calculation_service.into();
@@ -370,6 +365,9 @@ async fn run_cli(cli: Cli) -> Result<(), Box<dyn std::error::Error + Send + Sync
let reference_graph: ReferenceGraph<'_> =
serde_json::from_slice(reference_graph_json.as_slice())?;
+ // Arc the PathInfoService, as we clone it .
+ let path_info_service: Arc<dyn PathInfoService> = path_info_service.into();
+
let lookups_span = info_span!(
"lookup pathinfos",
"indicatif.pb_show" = tracing::field::Empty
@@ -477,7 +475,7 @@ async fn run_cli(cli: Cli) -> Result<(), Box<dyn std::error::Error + Send + Sync
let fs = make_fs(
blob_service,
directory_service,
- path_info_service,
+ Arc::from(path_info_service),
list_root,
show_xattr,
);
@@ -525,7 +523,7 @@ async fn run_cli(cli: Cli) -> Result<(), Box<dyn std::error::Error + Send + Sync
let fs = make_fs(
blob_service,
directory_service,
- path_info_service,
+ Arc::from(path_info_service),
list_root,
show_xattr,
);
diff --git a/tvix/store/src/composition.rs b/tvix/store/src/composition.rs
deleted file mode 100644
index a32f22cf7..000000000
--- a/tvix/store/src/composition.rs
+++ /dev/null
@@ -1,22 +0,0 @@
-use lazy_static::lazy_static;
-
-pub use tvix_castore::composition::*;
-
-lazy_static! {
- /// The provided registry of tvix_store, which has all the builtin
- /// tvix_castore (BlobStore/DirectoryStore) and tvix_store
- /// (PathInfoService) implementations.
- pub static ref REG: Registry = {
- let mut reg = Default::default();
- add_default_services(&mut reg);
- reg
- };
-}
-
-/// Register the builtin services of tvix_castore and tvix_store with the given
-/// registry. This is useful for creating your own registry with the builtin
-/// types _and_ extra third party types.
-pub fn add_default_services(reg: &mut Registry) {
- tvix_castore::composition::add_default_services(reg);
- crate::pathinfoservice::register_pathinfo_services(reg);
-}
diff --git a/tvix/store/src/lib.rs b/tvix/store/src/lib.rs
index 81a77cd97..8c32aaf88 100644
--- a/tvix/store/src/lib.rs
+++ b/tvix/store/src/lib.rs
@@ -1,4 +1,3 @@
-pub mod composition;
pub mod import;
pub mod nar;
pub mod pathinfoservice;
diff --git a/tvix/store/src/pathinfoservice/bigtable.rs b/tvix/store/src/pathinfoservice/bigtable.rs
index 15128986f..26d07689d 100644
--- a/tvix/store/src/pathinfoservice/bigtable.rs
+++ b/tvix/store/src/pathinfoservice/bigtable.rs
@@ -10,17 +10,15 @@ use nix_compat::nixbase32;
use prost::Message;
use serde::{Deserialize, Serialize};
use serde_with::{serde_as, DurationSeconds};
-use std::sync::Arc;
use tonic::async_trait;
use tracing::{instrument, trace};
-use tvix_castore::composition::{CompositionContext, ServiceBuilder};
use tvix_castore::Error;
/// There should not be more than 10 MiB in a single cell.
/// https://cloud.google.com/bigtable/docs/schema-design#cells
const CELL_SIZE_LIMIT: u64 = 10 * 1024 * 1024;
-/// Provides a [PathInfoService] implementation using
+/// Provides a [DirectoryService] implementation using
/// [Bigtable](https://cloud.google.com/bigtable/docs/)
/// as an underlying K/V store.
///
@@ -46,6 +44,57 @@ pub struct BigtablePathInfoService {
emulator: std::sync::Arc<(tempfile::TempDir, async_process::Child)>,
}
+/// Represents configuration of [BigtablePathInfoService].
+/// This currently conflates both connect parameters and data model/client
+/// behaviour parameters.
+#[serde_as]
+#[derive(Clone, Debug, PartialEq, Deserialize, Serialize)]
+pub struct BigtableParameters {
+ project_id: String,
+ instance_name: String,
+ #[serde(default)]
+ is_read_only: bool,
+ #[serde(default = "default_channel_size")]
+ channel_size: usize,
+
+ #[serde_as(as = "Option<DurationSeconds<String>>")]
+ #[serde(default = "default_timeout")]
+ timeout: Option<std::time::Duration>,
+ table_name: String,
+ family_name: String,
+
+ #[serde(default = "default_app_profile_id")]
+ app_profile_id: String,
+}
+
+impl BigtableParameters {
+ #[cfg(test)]
+ pub fn default_for_tests() -> Self {
+ Self {
+ project_id: "project-1".into(),
+ instance_name: "instance-1".into(),
+ is_read_only: false,
+ channel_size: default_channel_size(),
+ timeout: default_timeout(),
+ table_name: "table-1".into(),
+ family_name: "cf1".into(),
+ app_profile_id: default_app_profile_id(),
+ }
+ }
+}
+
+fn default_app_profile_id() -> String {
+ "default".to_owned()
+}
+
+fn default_channel_size() -> usize {
+ 4
+}
+
+fn default_timeout() -> Option<std::time::Duration> {
+ Some(std::time::Duration::from_secs(4))
+}
+
impl BigtablePathInfoService {
#[cfg(not(test))]
pub async fn connect(params: BigtableParameters) -> Result<Self, bigtable::Error> {
@@ -363,88 +412,3 @@ impl PathInfoService for BigtablePathInfoService {
Box::pin(stream)
}
}
-
-/// Represents configuration of [BigtablePathInfoService].
-/// This currently conflates both connect parameters and data model/client
-/// behaviour parameters.
-#[serde_as]
-#[derive(Clone, Debug, PartialEq, Deserialize, Serialize)]
-pub struct BigtableParameters {
- project_id: String,
- instance_name: String,
- #[serde(default)]
- is_read_only: bool,
- #[serde(default = "default_channel_size")]
- channel_size: usize,
-
- #[serde_as(as = "Option<DurationSeconds<String>>")]
- #[serde(default = "default_timeout")]
- timeout: Option<std::time::Duration>,
- table_name: String,
- family_name: String,
-
- #[serde(default = "default_app_profile_id")]
- app_profile_id: String,
-}
-
-impl BigtableParameters {
- #[cfg(test)]
- pub fn default_for_tests() -> Self {
- Self {
- project_id: "project-1".into(),
- instance_name: "instance-1".into(),
- is_read_only: false,
- channel_size: default_channel_size(),
- timeout: default_timeout(),
- table_name: "table-1".into(),
- family_name: "cf1".into(),
- app_profile_id: default_app_profile_id(),
- }
- }
-}
-
-fn default_app_profile_id() -> String {
- "default".to_owned()
-}
-
-fn default_channel_size() -> usize {
- 4
-}
-
-fn default_timeout() -> Option<std::time::Duration> {
- Some(std::time::Duration::from_secs(4))
-}
-
-#[async_trait]
-impl ServiceBuilder for BigtableParameters {
- type Output = dyn PathInfoService;
- async fn build<'a>(
- &'a self,
- _instance_name: &str,
- _context: &CompositionContext,
- ) -> Result<Arc<dyn PathInfoService>, Box<dyn std::error::Error + Send + Sync>> {
- Ok(Arc::new(
- BigtablePathInfoService::connect(self.clone()).await?,
- ))
- }
-}
-
-impl TryFrom<url::Url> for BigtableParameters {
- type Error = Box<dyn std::error::Error + Send + Sync>;
- fn try_from(mut url: url::Url) -> Result<Self, Self::Error> {
- // parse the instance name from the hostname.
- let instance_name = url
- .host_str()
- .ok_or_else(|| Error::StorageError("instance name missing".into()))?
- .to_string();
-
- // … but add it to the query string now, so we just need to parse that.
- url.query_pairs_mut()
- .append_pair("instance_name", &instance_name);
-
- let params: BigtableParameters = serde_qs::from_str(url.query().unwrap_or_default())
- .map_err(|e| Error::InvalidRequest(format!("failed to parse parameters: {}", e)))?;
-
- Ok(params)
- }
-}
diff --git a/tvix/store/src/pathinfoservice/combinators.rs b/tvix/store/src/pathinfoservice/combinators.rs
index bb5595f72..664144ef4 100644
--- a/tvix/store/src/pathinfoservice/combinators.rs
+++ b/tvix/store/src/pathinfoservice/combinators.rs
@@ -1,11 +1,8 @@
-use std::sync::Arc;
-
use crate::proto::PathInfo;
use futures::stream::BoxStream;
use nix_compat::nixbase32;
use tonic::async_trait;
use tracing::{debug, instrument};
-use tvix_castore::composition::{CompositionContext, ServiceBuilder};
use tvix_castore::Error;
use super::PathInfoService;
@@ -64,41 +61,6 @@ where
}
}
-#[derive(serde::Deserialize)]
-pub struct CacheConfig {
- pub near: String,
- pub far: String,
-}
-
-impl TryFrom<url::Url> for CacheConfig {
- type Error = Box<dyn std::error::Error + Send + Sync>;
- fn try_from(_url: url::Url) -> Result<Self, Self::Error> {
- Err(Error::StorageError(
- "Instantiating a CombinedPathInfoService from a url is not supported".into(),
- )
- .into())
- }
-}
-
-#[async_trait]
-impl ServiceBuilder for CacheConfig {
- type Output = dyn PathInfoService;
- async fn build<'a>(
- &'a self,
- _instance_name: &str,
- context: &CompositionContext,
- ) -> Result<Arc<dyn PathInfoService>, Box<dyn std::error::Error + Send + Sync + 'static>> {
- let (near, far) = futures::join!(
- context.resolve(self.near.clone()),
- context.resolve(self.far.clone())
- );
- Ok(Arc::new(Cache {
- near: near?,
- far: far?,
- }))
- }
-}
-
#[cfg(test)]
mod test {
use std::num::NonZeroUsize;
diff --git a/tvix/store/src/pathinfoservice/from_addr.rs b/tvix/store/src/pathinfoservice/from_addr.rs
index 5635c226c..9173d25d0 100644
--- a/tvix/store/src/pathinfoservice/from_addr.rs
+++ b/tvix/store/src/pathinfoservice/from_addr.rs
@@ -1,10 +1,13 @@
-use super::PathInfoService;
+use crate::proto::path_info_service_client::PathInfoServiceClient;
-use crate::composition::{
- with_registry, CompositionContext, DeserializeWithRegistry, ServiceBuilder, REG,
+use super::{
+ GRPCPathInfoService, MemoryPathInfoService, NixHTTPPathInfoService, PathInfoService,
+ SledPathInfoService,
};
+
+use nix_compat::narinfo;
use std::sync::Arc;
-use tvix_castore::Error;
+use tvix_castore::{blobservice::BlobService, directoryservice::DirectoryService, Error};
use url::Url;
/// Constructs a new instance of a [PathInfoService] from an URI.
@@ -31,21 +34,113 @@ use url::Url;
/// these also need to be passed in.
pub async fn from_addr(
uri: &str,
- context: Option<&CompositionContext<'_>>,
-) -> Result<Arc<dyn PathInfoService>, Box<dyn std::error::Error + Send + Sync>> {
+ blob_service: Arc<dyn BlobService>,
+ directory_service: Arc<dyn DirectoryService>,
+) -> Result<Box<dyn PathInfoService>, Error> {
#[allow(unused_mut)]
let mut url =
Url::parse(uri).map_err(|e| Error::StorageError(format!("unable to parse url: {}", e)))?;
- let path_info_service_config = with_registry(&REG, || {
- <DeserializeWithRegistry<Box<dyn ServiceBuilder<Output = dyn PathInfoService>>>>::try_from(
- url,
- )
- })?
- .0;
- let path_info_service = path_info_service_config
- .build("anonymous", context.unwrap_or(&CompositionContext::blank()))
- .await?;
+ let path_info_service: Box<dyn PathInfoService> = match url.scheme() {
+ "memory" => {
+ // memory doesn't support host or path in the URL.
+ if url.has_host() || !url.path().is_empty() {
+ return Err(Error::StorageError("invalid url".to_string()));
+ }
+ Box::<MemoryPathInfoService>::default()
+ }
+ "sled" => {
+ // sled doesn't support host, and a path can be provided (otherwise
+ // it'll live in memory only).
+ if url.has_host() {
+ return Err(Error::StorageError("no host allowed".to_string()));
+ }
+
+ if url.path() == "/" {
+ return Err(Error::StorageError(
+ "cowardly refusing to open / with sled".to_string(),
+ ));
+ }
+
+ // TODO: expose other parameters as URL parameters?
+
+ Box::new(if url.path().is_empty() {
+ SledPathInfoService::new_temporary()
+ .map_err(|e| Error::StorageError(e.to_string()))?
+ } else {
+ SledPathInfoService::new(url.path())
+ .map_err(|e| Error::StorageError(e.to_string()))?
+ })
+ }
+ "nix+http" | "nix+https" => {
+ // Stringify the URL and remove the nix+ prefix.
+ // We can't use `url.set_scheme(rest)`, as it disallows
+ // setting something http(s) that previously wasn't.
+ let new_url = Url::parse(url.to_string().strip_prefix("nix+").unwrap()).unwrap();
+
+ let mut nix_http_path_info_service =
+ NixHTTPPathInfoService::new(new_url, blob_service, directory_service);
+
+ let pairs = &url.query_pairs();
+ for (k, v) in pairs.into_iter() {
+ if k == "trusted-public-keys" {
+ let pubkey_strs: Vec<_> = v.split_ascii_whitespace().collect();
+
+ let mut pubkeys: Vec<narinfo::PubKey> = Vec::with_capacity(pubkey_strs.len());
+ for pubkey_str in pubkey_strs {
+ pubkeys.push(narinfo::PubKey::parse(pubkey_str).map_err(|e| {
+ Error::StorageError(format!("invalid public key: {e}"))
+ })?);
+ }
+
+ nix_http_path_info_service.set_public_keys(pubkeys);
+ }
+ }
+
+ Box::new(nix_http_path_info_service)
+ }
+ scheme if scheme.starts_with("grpc+") => {
+ // schemes starting with grpc+ go to the GRPCPathInfoService.
+ // That's normally grpc+unix for unix sockets, and grpc+http(s) for the HTTP counterparts.
+ // - In the case of unix sockets, there must be a path, but may not be a host.
+ // - In the case of non-unix sockets, there must be a host, but no path.
+ // Constructing the channel is handled by tvix_castore::channel::from_url.
+ Box::new(GRPCPathInfoService::from_client(
+ PathInfoServiceClient::with_interceptor(
+ tvix_castore::tonic::channel_from_url(&url).await?,
+ tvix_tracing::propagate::tonic::send_trace,
+ ),
+ ))
+ }
+ #[cfg(feature = "cloud")]
+ "bigtable" => {
+ use super::bigtable::BigtableParameters;
+ use super::BigtablePathInfoService;
+
+ // parse the instance name from the hostname.
+ let instance_name = url
+ .host_str()
+ .ok_or_else(|| Error::StorageError("instance name missing".into()))?
+ .to_string();
+
+ // … but add it to the query string now, so we just need to parse that.
+ url.query_pairs_mut()
+ .append_pair("instance_name", &instance_name);
+
+ let params: BigtableParameters = serde_qs::from_str(url.query().unwrap_or_default())
+ .map_err(|e| Error::InvalidRequest(format!("failed to parse parameters: {}", e)))?;
+
+ Box::new(
+ BigtablePathInfoService::connect(params)
+ .await
+ .map_err(|e| Error::StorageError(e.to_string()))?,
+ )
+ }
+ _ => Err(Error::StorageError(format!(
+ "unknown scheme: {}",
+ url.scheme()
+ )))?,
+ };
Ok(path_info_service)
}
@@ -53,12 +148,14 @@ pub async fn from_addr(
#[cfg(test)]
mod tests {
use super::from_addr;
- use crate::composition::{Composition, DeserializeWithRegistry, ServiceBuilder};
use lazy_static::lazy_static;
use rstest::rstest;
+ use std::sync::Arc;
use tempfile::TempDir;
- use tvix_castore::blobservice::{BlobService, MemoryBlobServiceConfig};
- use tvix_castore::directoryservice::{DirectoryService, MemoryDirectoryServiceConfig};
+ use tvix_castore::{
+ blobservice::{BlobService, MemoryBlobService},
+ directoryservice::{DirectoryService, MemoryDirectoryService},
+ };
lazy_static! {
static ref TMPDIR_SLED_1: TempDir = TempDir::new().unwrap();
@@ -127,19 +224,11 @@ mod tests {
)]
#[tokio::test]
async fn test_from_addr_tokio(#[case] uri_str: &str, #[case] exp_succeed: bool) {
- let mut comp = Composition::default();
- comp.extend(vec![(
- "default".into(),
- DeserializeWithRegistry(Box::new(MemoryBlobServiceConfig {})
- as Box<dyn ServiceBuilder<Output = dyn BlobService>>),
- )]);
- comp.extend(vec![(
- "default".into(),
- DeserializeWithRegistry(Box::new(MemoryDirectoryServiceConfig {})
- as Box<dyn ServiceBuilder<Output = dyn DirectoryService>>),
- )]);
-
- let resp = from_addr(uri_str, Some(&comp.context())).await;
+ let blob_service: Arc<dyn BlobService> = Arc::from(MemoryBlobService::default());
+ let directory_service: Arc<dyn DirectoryService> =
+ Arc::from(MemoryDirectoryService::default());
+
+ let resp = from_addr(uri_str, blob_service, directory_service).await;
if exp_succeed {
resp.expect("should succeed");
diff --git a/tvix/store/src/pathinfoservice/grpc.rs b/tvix/store/src/pathinfoservice/grpc.rs
index 2ac0e4330..bcee49aac 100644
--- a/tvix/store/src/pathinfoservice/grpc.rs
+++ b/tvix/store/src/pathinfoservice/grpc.rs
@@ -6,11 +6,9 @@ use crate::{
use async_stream::try_stream;
use futures::stream::BoxStream;
use nix_compat::nixbase32;
-use std::sync::Arc;
use tonic::{async_trait, Code};
use tracing::{instrument, Span};
use tracing_indicatif::span_ext::IndicatifSpanExt;
-use tvix_castore::composition::{CompositionContext, ServiceBuilder};
use tvix_castore::{proto as castorepb, Error};
/// Connects to a (remote) tvix-store PathInfoService over gRPC.
@@ -151,40 +149,6 @@ where
}
}
-#[derive(serde::Deserialize, Debug)]
-#[serde(deny_unknown_fields)]
-pub struct GRPCPathInfoServiceConfig {
- url: String,
-}
-
-impl TryFrom<url::Url> for GRPCPathInfoServiceConfig {
- type Error = Box<dyn std::error::Error + Send + Sync>;
- fn try_from(url: url::Url) -> Result<Self, Self::Error> {
- // normally grpc+unix for unix sockets, and grpc+http(s) for the HTTP counterparts.
- // - In the case of unix sockets, there must be a path, but may not be a host.
- // - In the case of non-unix sockets, there must be a host, but no path.
- // Constructing the channel is handled by tvix_castore::channel::from_url.
- Ok(GRPCPathInfoServiceConfig {
- url: url.to_string(),
- })
- }
-}
-
-#[async_trait]
-impl ServiceBuilder for GRPCPathInfoServiceConfig {
- type Output = dyn PathInfoService;
- async fn build<'a>(
- &'a self,
- _instance_name: &str,
- _context: &CompositionContext,
- ) -> Result<Arc<dyn PathInfoService>, Box<dyn std::error::Error + Send + Sync + 'static>> {
- let client = proto::path_info_service_client::PathInfoServiceClient::new(
- tvix_castore::tonic::channel_from_url(&self.url.parse()?).await?,
- );
- Ok(Arc::new(GRPCPathInfoService::from_client(client)))
- }
-}
-
#[cfg(test)]
mod tests {
use crate::pathinfoservice::tests::make_grpc_path_info_service_client;
diff --git a/tvix/store/src/pathinfoservice/lru.rs b/tvix/store/src/pathinfoservice/lru.rs
index 39c592bc9..da674f497 100644
--- a/tvix/store/src/pathinfoservice/lru.rs
+++ b/tvix/store/src/pathinfoservice/lru.rs
@@ -9,7 +9,6 @@ use tonic::async_trait;
use tracing::instrument;
use crate::proto::PathInfo;
-use tvix_castore::composition::{CompositionContext, ServiceBuilder};
use tvix_castore::Error;
use super::PathInfoService;
@@ -61,34 +60,6 @@ impl PathInfoService for LruPathInfoService {
}
}
-#[derive(serde::Deserialize, Debug)]
-#[serde(deny_unknown_fields)]
-pub struct LruPathInfoServiceConfig {
- capacity: NonZeroUsize,
-}
-
-impl TryFrom<url::Url> for LruPathInfoServiceConfig {
- type Error = Box<dyn std::error::Error + Send + Sync>;
- fn try_from(_url: url::Url) -> Result<Self, Self::Error> {
- Err(Error::StorageError(
- "Instantiating a LruPathInfoService from a url is not supported".into(),
- )
- .into())
- }
-}
-
-#[async_trait]
-impl ServiceBuilder for LruPathInfoServiceConfig {
- type Output = dyn PathInfoService;
- async fn build<'a>(
- &'a self,
- _instance_name: &str,
- _context: &CompositionContext,
- ) -> Result<Arc<dyn PathInfoService>, Box<dyn std::error::Error + Send + Sync + 'static>> {
- Ok(Arc::new(LruPathInfoService::with_capacity(self.capacity)))
- }
-}
-
#[cfg(test)]
mod test {
use std::num::NonZeroUsize;
diff --git a/tvix/store/src/pathinfoservice/memory.rs b/tvix/store/src/pathinfoservice/memory.rs
index 3fabd239c..3de3221df 100644
--- a/tvix/store/src/pathinfoservice/memory.rs
+++ b/tvix/store/src/pathinfoservice/memory.rs
@@ -7,7 +7,6 @@ use std::{collections::HashMap, sync::Arc};
use tokio::sync::RwLock;
use tonic::async_trait;
use tracing::instrument;
-use tvix_castore::composition::{CompositionContext, ServiceBuilder};
use tvix_castore::Error;
#[derive(Default)]
@@ -60,30 +59,3 @@ impl PathInfoService for MemoryPathInfoService {
})
}
}
-
-#[derive(serde::Deserialize, Debug)]
-#[serde(deny_unknown_fields)]
-pub struct MemoryPathInfoServiceConfig {}
-
-impl TryFrom<url::Url> for MemoryPathInfoServiceConfig {
- type Error = Box<dyn std::error::Error + Send + Sync>;
- fn try_from(url: url::Url) -> Result<Self, Self::Error> {
- // memory doesn't support host or path in the URL.
- if url.has_host() || !url.path().is_empty() {
- return Err(Error::StorageError("invalid url".to_string()).into());
- }
- Ok(MemoryPathInfoServiceConfig {})
- }
-}
-
-#[async_trait]
-impl ServiceBuilder for MemoryPathInfoServiceConfig {
- type Output = dyn PathInfoService;
- async fn build<'a>(
- &'a self,
- _instance_name: &str,
- _context: &CompositionContext,
- ) -> Result<Arc<dyn PathInfoService>, Box<dyn std::error::Error + Send + Sync + 'static>> {
- Ok(Arc::new(MemoryPathInfoService::default()))
- }
-}
diff --git a/tvix/store/src/pathinfoservice/mod.rs b/tvix/store/src/pathinfoservice/mod.rs
index 70f752f22..574bcc0b8 100644
--- a/tvix/store/src/pathinfoservice/mod.rs
+++ b/tvix/store/src/pathinfoservice/mod.rs
@@ -14,26 +14,22 @@ mod tests;
use futures::stream::BoxStream;
use tonic::async_trait;
-use tvix_castore::composition::{Registry, ServiceBuilder};
use tvix_castore::Error;
-use crate::nar::NarCalculationService;
use crate::proto::PathInfo;
-pub use self::combinators::{
- Cache as CachePathInfoService, CacheConfig as CachePathInfoServiceConfig,
-};
+pub use self::combinators::Cache as CachePathInfoService;
pub use self::from_addr::from_addr;
-pub use self::grpc::{GRPCPathInfoService, GRPCPathInfoServiceConfig};
-pub use self::lru::{LruPathInfoService, LruPathInfoServiceConfig};
-pub use self::memory::{MemoryPathInfoService, MemoryPathInfoServiceConfig};
-pub use self::nix_http::{NixHTTPPathInfoService, NixHTTPPathInfoServiceConfig};
-pub use self::sled::{SledPathInfoService, SledPathInfoServiceConfig};
+pub use self::grpc::GRPCPathInfoService;
+pub use self::lru::LruPathInfoService;
+pub use self::memory::MemoryPathInfoService;
+pub use self::nix_http::NixHTTPPathInfoService;
+pub use self::sled::SledPathInfoService;
#[cfg(feature = "cloud")]
mod bigtable;
#[cfg(feature = "cloud")]
-pub use self::bigtable::{BigtableParameters, BigtablePathInfoService};
+pub use self::bigtable::BigtablePathInfoService;
#[cfg(any(feature = "fuse", feature = "virtiofs"))]
pub use self::fs::make_fs;
@@ -56,16 +52,12 @@ pub trait PathInfoService: Send + Sync {
/// Rust doesn't support this as a generic in traits yet. This is the same thing that
/// [async_trait] generates, but for streams instead of futures.
fn list(&self) -> BoxStream<'static, Result<PathInfo, Error>>;
-
- fn nar_calculation_service(&self) -> Option<Box<dyn NarCalculationService>> {
- None
- }
}
#[async_trait]
impl<A> PathInfoService for A
where
- A: AsRef<dyn PathInfoService> + Send + Sync + 'static,
+ A: AsRef<dyn PathInfoService> + Send + Sync,
{
async fn get(&self, digest: [u8; 20]) -> Result<Option<PathInfo>, Error> {
self.as_ref().get(digest).await
@@ -79,19 +71,3 @@ where
self.as_ref().list()
}
}
-
-/// Registers the builtin PathInfoService implementations with the registry
-pub(crate) fn register_pathinfo_services(reg: &mut Registry) {
- reg.register::<Box<dyn ServiceBuilder<Output = dyn PathInfoService>>, CachePathInfoServiceConfig>("cache");
- reg.register::<Box<dyn ServiceBuilder<Output = dyn PathInfoService>>, GRPCPathInfoServiceConfig>("grpc");
- reg.register::<Box<dyn ServiceBuilder<Output = dyn PathInfoService>>, LruPathInfoServiceConfig>("lru");
- reg.register::<Box<dyn ServiceBuilder<Output = dyn PathInfoService>>, MemoryPathInfoServiceConfig>("memory");
- reg.register::<Box<dyn ServiceBuilder<Output = dyn PathInfoService>>, NixHTTPPathInfoServiceConfig>("nix");
- reg.register::<Box<dyn ServiceBuilder<Output = dyn PathInfoService>>, SledPathInfoServiceConfig>("sled");
- #[cfg(feature = "cloud")]
- {
- reg.register::<Box<dyn ServiceBuilder<Output = dyn PathInfoService>>, BigtableParameters>(
- "bigtable",
- );
- }
-}
diff --git a/tvix/store/src/pathinfoservice/nix_http.rs b/tvix/store/src/pathinfoservice/nix_http.rs
index af9234bc0..57fe37f44 100644
--- a/tvix/store/src/pathinfoservice/nix_http.rs
+++ b/tvix/store/src/pathinfoservice/nix_http.rs
@@ -7,15 +7,12 @@ use nix_compat::{
nixhash::NixHash,
};
use reqwest::StatusCode;
-use std::sync::Arc;
use tokio::io::{self, AsyncRead};
use tonic::async_trait;
use tracing::{debug, instrument, warn};
-use tvix_castore::composition::{CompositionContext, ServiceBuilder};
use tvix_castore::{
blobservice::BlobService, directoryservice::DirectoryService, proto as castorepb, Error,
};
-use url::Url;
/// NixHTTPPathInfoService acts as a bridge in between the Nix HTTP Binary cache
/// protocol provided by Nix binary caches such as cache.nixos.org, and the Tvix
@@ -252,71 +249,3 @@ where
}))
}
}
-
-#[derive(serde::Deserialize)]
-pub struct NixHTTPPathInfoServiceConfig {
- base_url: String,
- blob_service: String,
- directory_service: String,
- #[serde(default)]
- /// An optional list of [narinfo::PubKey].
- /// If set, the .narinfo files received need to have correct signature by at least one of these.
- public_keys: Option<Vec<String>>,
-}
-
-impl TryFrom<Url> for NixHTTPPathInfoServiceConfig {
- type Error = Box<dyn std::error::Error + Send + Sync>;
- fn try_from(url: Url) -> Result<Self, Self::Error> {
- let mut public_keys: Option<Vec<String>> = None;
- for (_, v) in url
- .query_pairs()
- .into_iter()
- .filter(|(k, _)| k == "trusted-public-keys")
- {
- public_keys
- .get_or_insert(Default::default())
- .extend(v.split_ascii_whitespace().map(ToString::to_string));
- }
- Ok(NixHTTPPathInfoServiceConfig {
- // Stringify the URL and remove the nix+ prefix.
- // We can't use `url.set_scheme(rest)`, as it disallows
- // setting something http(s) that previously wasn't.
- base_url: url.to_string().strip_prefix("nix+").unwrap().to_string(),
- blob_service: "default".to_string(),
- directory_service: "default".to_string(),
- public_keys,
- })
- }
-}
-
-#[async_trait]
-impl ServiceBuilder for NixHTTPPathInfoServiceConfig {
- type Output = dyn PathInfoService;
- async fn build<'a>(
- &'a self,
- _instance_name: &str,
- context: &CompositionContext,
- ) -> Result<Arc<dyn PathInfoService>, Box<dyn std::error::Error + Send + Sync + 'static>> {
- let (blob_service, directory_service) = futures::join!(
- context.resolve(self.blob_service.clone()),
- context.resolve(self.directory_service.clone())
- );
- let mut svc = NixHTTPPathInfoService::new(
- Url::parse(&self.base_url)?,
- blob_service?,
- directory_service?,
- );
- if let Some(public_keys) = &self.public_keys {
- svc.set_public_keys(
- public_keys
- .iter()
- .map(|pubkey_str| {
- narinfo::PubKey::parse(pubkey_str)
- .map_err(|e| Error::StorageError(format!("invalid public key: {e}")))
- })
- .collect::<Result<Vec<_>, Error>>()?,
- );
- }
- Ok(Arc::new(svc))
- }
-}
diff --git a/tvix/store/src/pathinfoservice/sled.rs b/tvix/store/src/pathinfoservice/sled.rs
index 4255bfd1d..96ade1816 100644
--- a/tvix/store/src/pathinfoservice/sled.rs
+++ b/tvix/store/src/pathinfoservice/sled.rs
@@ -5,10 +5,8 @@ use futures::stream::BoxStream;
use nix_compat::nixbase32;
use prost::Message;
use std::path::Path;
-use std::sync::Arc;
use tonic::async_trait;
use tracing::{instrument, warn};
-use tvix_castore::composition::{CompositionContext, ServiceBuilder};
use tvix_castore::Error;
/// SledPathInfoService stores PathInfo in a [sled](https://github.com/spacejam/sled).
@@ -116,69 +114,3 @@ impl PathInfoService for SledPathInfoService {
})
}
}
-
-#[derive(serde::Deserialize)]
-#[serde(deny_unknown_fields)]
-pub struct SledPathInfoServiceConfig {
- is_temporary: bool,
- #[serde(default)]
- /// required when is_temporary = false
- path: Option<String>,
-}
-
-impl TryFrom<url::Url> for SledPathInfoServiceConfig {
- type Error = Box<dyn std::error::Error + Send + Sync>;
- fn try_from(url: url::Url) -> Result<Self, Self::Error> {
- // sled doesn't support host, and a path can be provided (otherwise
- // it'll live in memory only).
- if url.has_host() {
- return Err(Error::StorageError("no host allowed".to_string()).into());
- }
-
- // TODO: expose compression and other parameters as URL parameters?
-
- Ok(if url.path().is_empty() {
- SledPathInfoServiceConfig {
- is_temporary: true,
- path: None,
- }
- } else {
- SledPathInfoServiceConfig {
- is_temporary: false,
- path: Some(url.path().to_string()),
- }
- })
- }
-}
-
-#[async_trait]
-impl ServiceBuilder for SledPathInfoServiceConfig {
- type Output = dyn PathInfoService;
- async fn build<'a>(
- &'a self,
- _instance_name: &str,
- _context: &CompositionContext,
- ) -> Result<Arc<dyn PathInfoService>, Box<dyn std::error::Error + Send + Sync + 'static>> {
- match self {
- SledPathInfoServiceConfig {
- is_temporary: true,
- path: None,
- } => Ok(Arc::new(SledPathInfoService::new_temporary()?)),
- SledPathInfoServiceConfig {
- is_temporary: true,
- path: Some(_),
- } => Err(
- Error::StorageError("Temporary SledPathInfoService can not have path".into())
- .into(),
- ),
- SledPathInfoServiceConfig {
- is_temporary: false,
- path: None,
- } => Err(Error::StorageError("SledPathInfoService is missing path".into()).into()),
- SledPathInfoServiceConfig {
- is_temporary: false,
- path: Some(path),
- } => Ok(Arc::new(SledPathInfoService::new(path)?)),
- }
- }
-}
diff --git a/tvix/store/src/utils.rs b/tvix/store/src/utils.rs
index a09786386..d82f2214f 100644
--- a/tvix/store/src/utils.rs
+++ b/tvix/store/src/utils.rs
@@ -1,60 +1,18 @@
+use std::sync::Arc;
use std::{
- collections::HashMap,
pin::Pin,
- sync::Arc,
task::{self, Poll},
};
use tokio::io::{self, AsyncWrite};
-use tvix_castore::{blobservice::BlobService, directoryservice::DirectoryService};
+use tvix_castore::{
+ blobservice::{self, BlobService},
+ directoryservice::{self, DirectoryService},
+};
use url::Url;
-use crate::composition::{
- with_registry, Composition, DeserializeWithRegistry, ServiceBuilder, REG,
-};
use crate::nar::{NarCalculationService, SimpleRenderer};
-use crate::pathinfoservice::PathInfoService;
-
-#[derive(serde::Deserialize, Default)]
-pub struct CompositionConfigs {
- pub blobservices:
- HashMap<String, DeserializeWithRegistry<Box<dyn ServiceBuilder<Output = dyn BlobService>>>>,
- pub directoryservices: HashMap<
- String,
- DeserializeWithRegistry<Box<dyn ServiceBuilder<Output = dyn DirectoryService>>>,
- >,
- pub pathinfoservices: HashMap<
- String,
- DeserializeWithRegistry<Box<dyn ServiceBuilder<Output = dyn PathInfoService>>>,
- >,
-}
-
-pub fn addrs_to_configs(
- blob_service_addr: impl AsRef<str>,
- directory_service_addr: impl AsRef<str>,
- path_info_service_addr: impl AsRef<str>,
-) -> Result<CompositionConfigs, Box<dyn std::error::Error + Send + Sync>> {
- let mut configs: CompositionConfigs = Default::default();
-
- let blob_service_url = Url::parse(blob_service_addr.as_ref())?;
- let directory_service_url = Url::parse(directory_service_addr.as_ref())?;
- let path_info_service_url = Url::parse(path_info_service_addr.as_ref())?;
-
- configs.blobservices.insert(
- "default".into(),
- with_registry(&REG, || blob_service_url.try_into())?,
- );
- configs.directoryservices.insert(
- "default".into(),
- with_registry(&REG, || directory_service_url.try_into())?,
- );
- configs.pathinfoservices.insert(
- "default".into(),
- with_registry(&REG, || path_info_service_url.try_into())?,
- );
-
- Ok(configs)
-}
+use crate::pathinfoservice::{self, PathInfoService};
/// Construct the store handles from their addrs.
pub async fn construct_services(
@@ -65,52 +23,49 @@ pub async fn construct_services(
(
Arc<dyn BlobService>,
Arc<dyn DirectoryService>,
- Arc<dyn PathInfoService>,
+ Box<dyn PathInfoService>,
Box<dyn NarCalculationService>,
),
Box<dyn std::error::Error + Send + Sync>,
> {
- let configs = addrs_to_configs(
- blob_service_addr,
- directory_service_addr,
- path_info_service_addr,
- )?;
- construct_services_from_configs(configs).await
-}
-
-/// Construct the store handles from their addrs.
-pub async fn construct_services_from_configs(
- configs: CompositionConfigs,
-) -> Result<
- (
- Arc<dyn BlobService>,
- Arc<dyn DirectoryService>,
- Arc<dyn PathInfoService>,
- Box<dyn NarCalculationService>,
- ),
- Box<dyn std::error::Error + Send + Sync>,
-> {
- let mut comp = Composition::default();
-
- comp.extend(configs.blobservices);
- comp.extend(configs.directoryservices);
- comp.extend(configs.pathinfoservices);
-
- let blob_service: Arc<dyn BlobService> = comp.build("default").await?;
- let directory_service: Arc<dyn DirectoryService> = comp.build("default").await?;
- let path_info_service: Arc<dyn PathInfoService> = comp.build("default").await?;
+ let blob_service: Arc<dyn BlobService> =
+ blobservice::from_addr(blob_service_addr.as_ref()).await?;
+ let directory_service: Arc<dyn DirectoryService> =
+ directoryservice::from_addr(directory_service_addr.as_ref()).await?;
+
+ let path_info_service = pathinfoservice::from_addr(
+ path_info_service_addr.as_ref(),
+ blob_service.clone(),
+ directory_service.clone(),
+ )
+ .await?;
// HACK: The grpc client also implements NarCalculationService, and we
// really want to use it (otherwise we'd need to fetch everything again for hashing).
// Until we revamped store composition and config, detect this special case here.
- let nar_calculation_service: Box<dyn NarCalculationService> = path_info_service
- .nar_calculation_service()
- .unwrap_or_else(|| {
+ let nar_calculation_service: Box<dyn NarCalculationService> = {
+ use crate::pathinfoservice::GRPCPathInfoService;
+ use crate::proto::path_info_service_client::PathInfoServiceClient;
+
+ let url = Url::parse(path_info_service_addr.as_ref())
+ .map_err(|e| io::Error::other(e.to_string()))?;
+
+ if url.scheme().starts_with("grpc+") {
+ Box::new(GRPCPathInfoService::from_client(
+ PathInfoServiceClient::with_interceptor(
+ tvix_castore::tonic::channel_from_url(&url)
+ .await
+ .map_err(|e| io::Error::other(e.to_string()))?,
+ tvix_tracing::propagate::tonic::send_trace,
+ ),
+ ))
+ } else {
Box::new(SimpleRenderer::new(
blob_service.clone(),
directory_service.clone(),
- ))
- });
+ )) as Box<dyn NarCalculationService>
+ }
+ };
Ok((
blob_service,