diff --git a/attic/src/api/v1/upload_path.rs b/attic/src/api/v1/upload_path.rs index fbe23d0..4dc3426 100644 --- a/attic/src/api/v1/upload_path.rs +++ b/attic/src/api/v1/upload_path.rs @@ -61,15 +61,24 @@ pub struct UploadPathResult { /// The compressed size of the NAR, in bytes. #[serde(skip_serializing_if = "Option::is_none")] pub file_size: Option, + + /// The fraction of data that was deduplicated, from 0 to 1. + pub frac_deduplicated: Option, } #[derive(Debug, PartialEq, Eq, Serialize, Deserialize)] #[non_exhaustive] pub enum UploadPathResultKind { /// The path was uploaded. + /// + /// This is purely informational and servers may return + /// this variant even when the NAR is deduplicated. Uploaded, /// The path was globally deduplicated. + /// + /// The exact semantics of what counts as deduplicated + /// is opaque to the client. Deduplicated, } diff --git a/client/src/command/push.rs b/client/src/command/push.rs index 95c1039..56383da 100644 --- a/client/src/command/push.rs +++ b/client/src/command/push.rs @@ -17,7 +17,7 @@ use crate::api::ApiClient; use crate::cache::{CacheName, CacheRef}; use crate::cli::Opts; use crate::config::Config; -use attic::api::v1::upload_path::{UploadPathNarInfo, UploadPathResultKind}; +use attic::api::v1::upload_path::{UploadPathNarInfo, UploadPathResult, UploadPathResultKind}; use attic::error::AtticResult; use attic::nix_store::{NixStore, StorePath, StorePathHash, ValidPathInfo}; @@ -132,37 +132,39 @@ pub async fn upload_path( let start = Instant::now(); match api.upload_path(upload_info, nar_stream).await { Ok(r) => { - if r.is_none() { - mp.suspend(|| { - eprintln!("Warning: Please update your server. Compatibility will be removed in the first stable release."); - }) - } + let r = r.unwrap_or(UploadPathResult { + kind: UploadPathResultKind::Uploaded, + file_size: None, + frac_deduplicated: None, + }); - let deduplicated = if let Some(r) = r { - r.kind == UploadPathResultKind::Deduplicated - } else { - false + let info_string: String = match r.kind { + UploadPathResultKind::Deduplicated => "deduplicated".to_string(), + _ => { + let elapsed = start.elapsed(); + let seconds = elapsed.as_secs_f64(); + let speed = (path_info.nar_size as f64 / seconds) as u64; + + let mut s = format!("{}/s", HumanBytes(speed)); + + if let Some(frac_deduplicated) = r.frac_deduplicated { + if frac_deduplicated > 0.01f64 { + s += &format!(", {:.1}% deduplicated", frac_deduplicated * 100.0); + } + } + + s + } }; - if deduplicated { - mp.suspend(|| { - eprintln!("✅ {} (deduplicated)", path.as_os_str().to_string_lossy()); - }); - bar.finish_and_clear(); - } else { - let elapsed = start.elapsed(); - let seconds = elapsed.as_secs_f64(); - let speed = (path_info.nar_size as f64 / seconds) as u64; - - mp.suspend(|| { - eprintln!( - "✅ {} ({}/s)", - path.as_os_str().to_string_lossy(), - HumanBytes(speed) - ); - }); - bar.finish_and_clear(); - } + mp.suspend(|| { + eprintln!( + "✅ {} ({})", + path.as_os_str().to_string_lossy(), + info_string + ); + }); + bar.finish_and_clear(); Ok(()) } diff --git a/server/src/api/v1/upload_path.rs b/server/src/api/v1/upload_path.rs index 59719f6..ab6d11d 100644 --- a/server/src/api/v1/upload_path.rs +++ b/server/src/api/v1/upload_path.rs @@ -62,6 +62,12 @@ enum ChunkData { Stream(Box, Hash, usize), } +/// Result of a chunk upload. +struct UploadChunkResult { + guard: ChunkGuard, + deduplicated: bool, +} + /// Applies compression to a stream, computing hashes along the way. /// /// Our strategy is to stream directly onto a UUID-keyed file on the @@ -247,6 +253,7 @@ async fn upload_path_dedup( Ok(Json(UploadPathResult { kind: UploadPathResultKind::Deduplicated, file_size: None, // TODO: Sum the chunks + frac_deduplicated: None, })) } @@ -341,15 +348,25 @@ async fn upload_path_new_chunked( } // Wait for all uploads to complete - let chunks: Vec = join_all(futures) + let chunks: Vec = join_all(futures) .await .into_iter() .map(|join_result| join_result.unwrap()) .collect::>>()?; - let file_size = chunks - .iter() - .fold(0, |acc, c| acc + c.file_size.unwrap() as usize); + let (file_size, deduplicated_size) = + chunks + .iter() + .fold((0, 0), |(file_size, deduplicated_size), c| { + ( + file_size + c.guard.file_size.unwrap() as usize, + if c.deduplicated { + deduplicated_size + c.guard.chunk_size as usize + } else { + deduplicated_size + }, + ) + }); // Finally... let txn = database @@ -385,9 +402,9 @@ async fn upload_path_new_chunked( ChunkRef::insert(chunkref::ActiveModel { nar_id: Set(nar_id), seq: Set(i as i32), - chunk_id: Set(Some(chunk.id)), - chunk_hash: Set(chunk.chunk_hash.clone()), - compression: Set(chunk.compression.clone()), + chunk_id: Set(Some(chunk.guard.id)), + chunk_hash: Set(chunk.guard.chunk_hash.clone()), + compression: Set(chunk.guard.compression.clone()), ..Default::default() }) .exec(&txn) @@ -419,6 +436,9 @@ async fn upload_path_new_chunked( Ok(Json(UploadPathResult { kind: UploadPathResultKind::Uploaded, file_size: Some(file_size), + + // Currently, frac_deduplicated is computed from size before compression + frac_deduplicated: Some(deduplicated_size as f64 / *nar_size as f64), })) } @@ -452,7 +472,7 @@ async fn upload_path_new_unchunked( state.config.require_proof_of_possession, ) .await?; - let file_size = chunk.file_size.unwrap() as usize; + let file_size = chunk.guard.file_size.unwrap() as usize; // Finally... let txn = database @@ -467,7 +487,7 @@ async fn upload_path_new_unchunked( compression: Set(compression.to_string()), nar_hash: Set(upload_info.nar_hash.to_typed_base16()), - nar_size: Set(chunk.chunk_size), + nar_size: Set(chunk.guard.chunk_size), num_chunks: Set(1), @@ -487,7 +507,7 @@ async fn upload_path_new_unchunked( ChunkRef::insert(chunkref::ActiveModel { nar_id: Set(nar_id), seq: Set(0), - chunk_id: Set(Some(chunk.id)), + chunk_id: Set(Some(chunk.guard.id)), chunk_hash: Set(upload_info.nar_hash.to_typed_base16()), compression: Set(compression.to_string()), ..Default::default() @@ -520,6 +540,7 @@ async fn upload_path_new_unchunked( Ok(Json(UploadPathResult { kind: UploadPathResultKind::Uploaded, file_size: Some(file_size), + frac_deduplicated: None, })) } @@ -533,7 +554,7 @@ async fn upload_chunk( database: DatabaseConnection, state: State, require_proof_of_possession: bool, -) -> ServerResult { +) -> ServerResult { let compression: Compression = compression_type.into(); let given_chunk_hash = data.hash(); @@ -565,7 +586,10 @@ async fn upload_chunk( } } - return Ok(existing_chunk); + return Ok(UploadChunkResult { + guard: existing_chunk, + deduplicated: true, + }); } let key = format!("{}.chunk", Uuid::new_v4()); @@ -680,7 +704,10 @@ async fn upload_chunk( let guard = ChunkGuard::from_locked(database.clone(), chunk); - Ok(guard) + Ok(UploadChunkResult { + guard, + deduplicated: false, + }) } /// Returns a compressor function that takes some stream as input.