Expose deduplication ratio to client

This commit is contained in:
Zhaofeng Li 2023-01-17 14:10:27 -07:00
parent 19111317f7
commit 5b42839f58
3 changed files with 80 additions and 42 deletions

View file

@ -61,15 +61,24 @@ pub struct UploadPathResult {
/// The compressed size of the NAR, in bytes.
#[serde(skip_serializing_if = "Option::is_none")]
pub file_size: Option<usize>,
/// The fraction of data that was deduplicated, from 0 to 1.
pub frac_deduplicated: Option<f64>,
}
#[derive(Debug, PartialEq, Eq, Serialize, Deserialize)]
#[non_exhaustive]
pub enum UploadPathResultKind {
/// The path was uploaded.
///
/// This is purely informational and servers may return
/// this variant even when the NAR is deduplicated.
Uploaded,
/// The path was globally deduplicated.
///
/// The exact semantics of what counts as deduplicated
/// is opaque to the client.
Deduplicated,
}

View file

@ -17,7 +17,7 @@ use crate::api::ApiClient;
use crate::cache::{CacheName, CacheRef};
use crate::cli::Opts;
use crate::config::Config;
use attic::api::v1::upload_path::{UploadPathNarInfo, UploadPathResultKind};
use attic::api::v1::upload_path::{UploadPathNarInfo, UploadPathResult, UploadPathResultKind};
use attic::error::AtticResult;
use attic::nix_store::{NixStore, StorePath, StorePathHash, ValidPathInfo};
@ -132,37 +132,39 @@ pub async fn upload_path(
let start = Instant::now();
match api.upload_path(upload_info, nar_stream).await {
Ok(r) => {
if r.is_none() {
mp.suspend(|| {
eprintln!("Warning: Please update your server. Compatibility will be removed in the first stable release.");
})
}
let deduplicated = if let Some(r) = r {
r.kind == UploadPathResultKind::Deduplicated
} else {
false
};
if deduplicated {
mp.suspend(|| {
eprintln!("{} (deduplicated)", path.as_os_str().to_string_lossy());
let r = r.unwrap_or(UploadPathResult {
kind: UploadPathResultKind::Uploaded,
file_size: None,
frac_deduplicated: None,
});
bar.finish_and_clear();
} else {
let info_string: String = match r.kind {
UploadPathResultKind::Deduplicated => "deduplicated".to_string(),
_ => {
let elapsed = start.elapsed();
let seconds = elapsed.as_secs_f64();
let speed = (path_info.nar_size as f64 / seconds) as u64;
let mut s = format!("{}/s", HumanBytes(speed));
if let Some(frac_deduplicated) = r.frac_deduplicated {
if frac_deduplicated > 0.01f64 {
s += &format!(", {:.1}% deduplicated", frac_deduplicated * 100.0);
}
}
s
}
};
mp.suspend(|| {
eprintln!(
"✅ {} ({}/s)",
"✅ {} ({})",
path.as_os_str().to_string_lossy(),
HumanBytes(speed)
info_string
);
});
bar.finish_and_clear();
}
Ok(())
}

View file

@ -62,6 +62,12 @@ enum ChunkData {
Stream(Box<dyn AsyncRead + Send + Unpin + 'static>, Hash, usize),
}
/// Result of a chunk upload.
struct UploadChunkResult {
guard: ChunkGuard,
deduplicated: bool,
}
/// Applies compression to a stream, computing hashes along the way.
///
/// Our strategy is to stream directly onto a UUID-keyed file on the
@ -247,6 +253,7 @@ async fn upload_path_dedup(
Ok(Json(UploadPathResult {
kind: UploadPathResultKind::Deduplicated,
file_size: None, // TODO: Sum the chunks
frac_deduplicated: None,
}))
}
@ -341,15 +348,25 @@ async fn upload_path_new_chunked(
}
// Wait for all uploads to complete
let chunks: Vec<ChunkGuard> = join_all(futures)
let chunks: Vec<UploadChunkResult> = join_all(futures)
.await
.into_iter()
.map(|join_result| join_result.unwrap())
.collect::<ServerResult<Vec<_>>>()?;
let file_size = chunks
let (file_size, deduplicated_size) =
chunks
.iter()
.fold(0, |acc, c| acc + c.file_size.unwrap() as usize);
.fold((0, 0), |(file_size, deduplicated_size), c| {
(
file_size + c.guard.file_size.unwrap() as usize,
if c.deduplicated {
deduplicated_size + c.guard.chunk_size as usize
} else {
deduplicated_size
},
)
});
// Finally...
let txn = database
@ -385,9 +402,9 @@ async fn upload_path_new_chunked(
ChunkRef::insert(chunkref::ActiveModel {
nar_id: Set(nar_id),
seq: Set(i as i32),
chunk_id: Set(Some(chunk.id)),
chunk_hash: Set(chunk.chunk_hash.clone()),
compression: Set(chunk.compression.clone()),
chunk_id: Set(Some(chunk.guard.id)),
chunk_hash: Set(chunk.guard.chunk_hash.clone()),
compression: Set(chunk.guard.compression.clone()),
..Default::default()
})
.exec(&txn)
@ -419,6 +436,9 @@ async fn upload_path_new_chunked(
Ok(Json(UploadPathResult {
kind: UploadPathResultKind::Uploaded,
file_size: Some(file_size),
// Currently, frac_deduplicated is computed from size before compression
frac_deduplicated: Some(deduplicated_size as f64 / *nar_size as f64),
}))
}
@ -452,7 +472,7 @@ async fn upload_path_new_unchunked(
state.config.require_proof_of_possession,
)
.await?;
let file_size = chunk.file_size.unwrap() as usize;
let file_size = chunk.guard.file_size.unwrap() as usize;
// Finally...
let txn = database
@ -467,7 +487,7 @@ async fn upload_path_new_unchunked(
compression: Set(compression.to_string()),
nar_hash: Set(upload_info.nar_hash.to_typed_base16()),
nar_size: Set(chunk.chunk_size),
nar_size: Set(chunk.guard.chunk_size),
num_chunks: Set(1),
@ -487,7 +507,7 @@ async fn upload_path_new_unchunked(
ChunkRef::insert(chunkref::ActiveModel {
nar_id: Set(nar_id),
seq: Set(0),
chunk_id: Set(Some(chunk.id)),
chunk_id: Set(Some(chunk.guard.id)),
chunk_hash: Set(upload_info.nar_hash.to_typed_base16()),
compression: Set(compression.to_string()),
..Default::default()
@ -520,6 +540,7 @@ async fn upload_path_new_unchunked(
Ok(Json(UploadPathResult {
kind: UploadPathResultKind::Uploaded,
file_size: Some(file_size),
frac_deduplicated: None,
}))
}
@ -533,7 +554,7 @@ async fn upload_chunk(
database: DatabaseConnection,
state: State,
require_proof_of_possession: bool,
) -> ServerResult<ChunkGuard> {
) -> ServerResult<UploadChunkResult> {
let compression: Compression = compression_type.into();
let given_chunk_hash = data.hash();
@ -565,7 +586,10 @@ async fn upload_chunk(
}
}
return Ok(existing_chunk);
return Ok(UploadChunkResult {
guard: existing_chunk,
deduplicated: true,
});
}
let key = format!("{}.chunk", Uuid::new_v4());
@ -680,7 +704,10 @@ async fn upload_chunk(
let guard = ChunkGuard::from_locked(database.clone(), chunk);
Ok(guard)
Ok(UploadChunkResult {
guard,
deduplicated: false,
})
}
/// Returns a compressor function that takes some stream as input.