diff --git a/attic/src/api/v1/upload_path.rs b/attic/src/api/v1/upload_path.rs index c60f5db..5b1231e 100644 --- a/attic/src/api/v1/upload_path.rs +++ b/attic/src/api/v1/upload_path.rs @@ -8,10 +8,19 @@ use crate::nix_store::StorePathHash; /// Header containing the upload info. pub const ATTIC_NAR_INFO: &str = "X-Attic-Nar-Info"; +/// Header containing the size of the upload info at the beginning of the body. +pub const ATTIC_NAR_INFO_PREAMBLE_SIZE: &str = "X-Attic-Nar-Info-Preamble-Size"; + /// NAR information associated with a upload. /// -/// This is JSON-serialized as the value of the `X-Attic-Nar-Info` header. -/// The (client-compressed) NAR is the PUT body. +/// There are two ways for the client to supply the NAR information: +/// +/// 1. At the beginning of the PUT body. The `X-Attic-Nar-Info-Preamble-Size` +/// header must be set to the size of the JSON. +/// 2. Through the `X-Attic-Nar-Info` header. +/// +/// The client is advised to use the first method if the serialized +/// JSON is large (>4K). /// /// Regardless of client compression, the server will always decompress /// the NAR to validate the NAR hash before applying the server-configured diff --git a/client/src/api/mod.rs b/client/src/api/mod.rs index 4d9fe99..399627b 100644 --- a/client/src/api/mod.rs +++ b/client/src/api/mod.rs @@ -5,7 +5,10 @@ use anyhow::Result; use bytes::Bytes; use const_format::concatcp; use displaydoc::Display; -use futures::TryStream; +use futures::{ + future, + stream::{self, StreamExt, TryStream, TryStreamExt}, +}; use reqwest::{ header::{HeaderMap, HeaderValue, AUTHORIZATION, USER_AGENT}, Body, Client as HttpClient, Response, StatusCode, Url, @@ -16,7 +19,9 @@ use crate::config::ServerConfig; use crate::version::ATTIC_DISTRIBUTOR; use attic::api::v1::cache_config::{CacheConfig, CreateCacheRequest}; use attic::api::v1::get_missing_paths::{GetMissingPathsRequest, GetMissingPathsResponse}; -use attic::api::v1::upload_path::{UploadPathNarInfo, UploadPathResult, ATTIC_NAR_INFO}; +use attic::api::v1::upload_path::{ + UploadPathNarInfo, UploadPathResult, ATTIC_NAR_INFO, ATTIC_NAR_INFO_PREAMBLE_SIZE, +}; use attic::cache::CacheName; use attic::nix_store::StorePathHash; @@ -24,6 +29,9 @@ use attic::nix_store::StorePathHash; const ATTIC_USER_AGENT: &str = concatcp!("Attic/{} ({})", env!("CARGO_PKG_NAME"), ATTIC_DISTRIBUTOR); +/// The size threshold to send the upload info as part of the PUT body. +const NAR_INFO_PREAMBLE_THRESHOLD: usize = 4 * 1024; // 4 KiB + /// The Attic API client. #[derive(Debug, Clone)] pub struct ApiClient { @@ -165,21 +173,34 @@ impl ApiClient { &self, nar_info: UploadPathNarInfo, stream: S, + force_preamble: bool, ) -> Result> where - S: TryStream + Send + Sync + 'static, - S::Error: Into>, - Bytes: From, + S: TryStream + Send + Sync + 'static, + S::Error: Into> + Send + Sync, { let endpoint = self.endpoint.join("_api/v1/upload-path")?; let upload_info_json = serde_json::to_string(&nar_info)?; - let req = self + let mut req = self .client .put(endpoint) - .header(ATTIC_NAR_INFO, HeaderValue::from_str(&upload_info_json)?) - .header(USER_AGENT, HeaderValue::from_str(ATTIC_USER_AGENT)?) - .body(Body::wrap_stream(stream)); + .header(USER_AGENT, HeaderValue::from_str(ATTIC_USER_AGENT)?); + + if force_preamble || upload_info_json.len() >= NAR_INFO_PREAMBLE_THRESHOLD { + let preamble = Bytes::from(upload_info_json); + let preamble_len = preamble.len(); + let preamble_stream = stream::once(future::ok(preamble)); + + let chained = preamble_stream.chain(stream.into_stream()); + req = req + .header(ATTIC_NAR_INFO_PREAMBLE_SIZE, preamble_len) + .body(Body::wrap_stream(chained)); + } else { + req = req + .header(ATTIC_NAR_INFO, HeaderValue::from_str(&upload_info_json)?) + .body(Body::wrap_stream(stream)); + } let res = req.send().await?; diff --git a/client/src/command/push.rs b/client/src/command/push.rs index 56383da..0fd5a4d 100644 --- a/client/src/command/push.rs +++ b/client/src/command/push.rs @@ -7,9 +7,10 @@ use std::task::{Context, Poll}; use std::time::{Duration, Instant}; use anyhow::{anyhow, Result}; +use bytes::Bytes; use clap::Parser; use futures::future::join_all; -use futures::stream::Stream; +use futures::stream::{Stream, TryStreamExt}; use indicatif::{HumanBytes, MultiProgress, ProgressBar, ProgressState, ProgressStyle}; use tokio::sync::Semaphore; @@ -41,6 +42,10 @@ pub struct Push { /// The maximum number of parallel upload processes. #[clap(short = 'j', long, default_value = "5")] jobs: usize, + + /// Always send the upload info as part of the payload. + #[clap(long, hide = true)] + force_preamble: bool, } struct PushPlan { @@ -70,6 +75,7 @@ pub async fn upload_path( api: ApiClient, cache: &CacheName, mp: MultiProgress, + force_preamble: bool, ) -> Result<()> { let path = &path_info.path; let upload_info = { @@ -127,10 +133,14 @@ pub async fn upload_path( ); let bar = mp.add(ProgressBar::new(path_info.nar_size)); bar.set_style(style); - let nar_stream = NarStreamProgress::new(store.nar_from_path(path.to_owned()), bar.clone()); + let nar_stream = NarStreamProgress::new(store.nar_from_path(path.to_owned()), bar.clone()) + .map_ok(Bytes::from); let start = Instant::now(); - match api.upload_path(upload_info, nar_stream).await { + match api + .upload_path(upload_info, nar_stream, force_preamble) + .await + { Ok(r) => { let r = r.unwrap_or(UploadPathResult { kind: UploadPathResultKind::Uploaded, @@ -243,7 +253,15 @@ pub async fn run(opts: Opts) -> Result<()> { async move { let permit = upload_limit.acquire().await?; - upload_path(store.clone(), path_info, api, cache, mp.clone()).await?; + upload_path( + store.clone(), + path_info, + api, + cache, + mp.clone(), + sub.force_preamble, + ) + .await?; drop(permit); Ok::<(), anyhow::Error>(()) diff --git a/integration-tests/basic/default.nix b/integration-tests/basic/default.nix index b12f0ae..b6e8264 100644 --- a/integration-tests/basic/default.nix +++ b/integration-tests/basic/default.nix @@ -9,16 +9,23 @@ let atticd = ". /etc/atticd.env && export ATTIC_SERVER_TOKEN_HS256_SECRET_BASE64 && atticd -f ${serverConfigFile}"; }; - testDrv = pkgs.writeText "test.nix" '' + makeTestDerivation = pkgs.writeShellScript "make-drv" '' + name=$1 + base=$(basename $name) + + cat >$name < $out"; exit 0; */ + /*/sh -c "echo hello > \$out"; exit 0; */ derivation { - name = "hello.txt"; - builder = ./test.nix; + name = "$base"; + builder = ./$name; system = builtins.currentSystem; preferLocalBuild = true; allowSubstitutes = false; } + EOF + + chmod +x $name ''; databaseModules = { @@ -171,7 +178,7 @@ in { client.succeed("attic cache create test") with subtest("Check that we can push a path"): - client.succeed("cat ${testDrv} >test.nix && chmod +x test.nix") + client.succeed("${makeTestDerivation} test.nix") test_file = client.succeed("nix-build --no-out-link test.nix") test_file_hash = test_file.removeprefix("/nix/store/")[:32] @@ -210,6 +217,13 @@ in { assert files.strip() == "" ''} + with subtest("Check that we can include the upload info in the payload"): + client.succeed("${makeTestDerivation} test2.nix") + test2_file = client.succeed("nix-build --no-out-link test2.nix") + client.succeed(f"attic push --force-preamble test {test2_file}") + client.succeed(f"nix-store --delete {test2_file}") + client.succeed(f"nix-store -r {test2_file}") + with subtest("Check that we can destroy the cache"): client.succeed("attic cache info test") client.succeed("attic cache destroy --no-confirm test") diff --git a/server/src/api/v1/upload_path.rs b/server/src/api/v1/upload_path.rs index 8763504..92ee60f 100644 --- a/server/src/api/v1/upload_path.rs +++ b/server/src/api/v1/upload_path.rs @@ -11,7 +11,7 @@ use axum::{ extract::{BodyStream, Extension, Json}, http::HeaderMap, }; -use bytes::Bytes; +use bytes::{Bytes, BytesMut}; use chrono::Utc; use digest::Output as DigestOutput; use futures::future::join_all; @@ -34,9 +34,10 @@ use crate::narinfo::Compression; use crate::{RequestState, State}; use attic::api::v1::upload_path::{ UploadPathNarInfo, UploadPathResult, UploadPathResultKind, ATTIC_NAR_INFO, + ATTIC_NAR_INFO_PREAMBLE_SIZE, }; use attic::hash::Hash; -use attic::stream::StreamHasher; +use attic::stream::{read_chunk_async, StreamHasher}; use attic::util::Finally; use crate::chunking::chunk_stream; @@ -53,6 +54,9 @@ use crate::database::{AtticDatabase, ChunkGuard, NarGuard}; /// TODO: Make this configurable const CONCURRENT_CHUNK_UPLOADS: usize = 10; +/// The maximum size of the upload info JSON. +const MAX_NAR_INFO_SIZE: usize = 64 * 1024; // 64 KiB + type CompressorFn = Box Box + Send>; /// Data of a chunk. @@ -116,12 +120,52 @@ pub(crate) async fn upload_path( headers: HeaderMap, stream: BodyStream, ) -> ServerResult> { - let upload_info: UploadPathNarInfo = { - let header = headers - .get(ATTIC_NAR_INFO) - .ok_or_else(|| ErrorKind::RequestError(anyhow!("X-Attic-Nar-Info must be set")))?; + let mut stream = StreamReader::new( + stream.map(|r| r.map_err(|e| io::Error::new(io::ErrorKind::Other, e.to_string()))), + ); - serde_json::from_slice(header.as_bytes()).map_err(ServerError::request_error)? + let upload_info: UploadPathNarInfo = { + if let Some(preamble_size_bytes) = headers.get(ATTIC_NAR_INFO_PREAMBLE_SIZE) { + // Read from the beginning of the PUT body + let preamble_size: usize = preamble_size_bytes + .to_str() + .map_err(|_| { + ErrorKind::RequestError(anyhow!( + "{} has invalid encoding", + ATTIC_NAR_INFO_PREAMBLE_SIZE + )) + })? + .parse() + .map_err(|_| { + ErrorKind::RequestError(anyhow!( + "{} must be a valid unsigned integer", + ATTIC_NAR_INFO_PREAMBLE_SIZE + )) + })?; + + if preamble_size > MAX_NAR_INFO_SIZE { + return Err(ErrorKind::RequestError(anyhow!("Upload info is too large")).into()); + } + + let buf = BytesMut::with_capacity(preamble_size); + let preamble = read_chunk_async(&mut stream, buf) + .await + .map_err(|e| ErrorKind::RequestError(e.into()))?; + + if preamble.len() != preamble_size { + return Err(ErrorKind::RequestError(anyhow!( + "Upload info doesn't match specified size" + )) + .into()); + } + + serde_json::from_slice(&preamble).map_err(ServerError::request_error)? + } else if let Some(nar_info_bytes) = headers.get(ATTIC_NAR_INFO) { + // Read from X-Attic-Nar-Info header + serde_json::from_slice(nar_info_bytes.as_bytes()).map_err(ServerError::request_error)? + } else { + return Err(ErrorKind::RequestError(anyhow!("{} must be set", ATTIC_NAR_INFO)).into()); + } }; let cache_name = &upload_info.cache; @@ -134,10 +178,6 @@ pub(crate) async fn upload_path( }) .await?; - let stream = StreamReader::new( - stream.map(|r| r.map_err(|e| io::Error::new(io::ErrorKind::Other, e.to_string()))), - ); - let username = req_state.auth.username().map(str::to_string); // Try to acquire a lock on an existing NAR