api/v1/upload-path: Inform the client whether the uploaded path is deduplicated

This commit is contained in:
Zhaofeng Li 2023-01-04 21:05:07 -07:00
parent a7578d1896
commit 7b53ce15cd
6 changed files with 94 additions and 30 deletions

1
Cargo.lock generated
View file

@ -153,6 +153,7 @@ dependencies = [
"regex",
"serde",
"serde_json",
"serde_with",
"serde_yaml",
"sha2",
"tempfile",

View file

@ -17,6 +17,7 @@ nix-base32 = { git = "https://github.com/zhaofengli/nix-base32.git", rev = "b850
regex = "1.7.0"
serde = { version = "1.0.151", features = ["derive"] }
serde_yaml = "0.9.16"
serde_with = "2.1.0"
sha2 = "0.10.6"
tempfile = "3"
wildmatch = "2.1.1"

View file

@ -1,4 +1,5 @@
use serde::{Deserialize, Serialize};
use serde_with::{serde_as, DefaultOnError};
use crate::cache::CacheName;
use crate::hash::Hash;
@ -50,3 +51,26 @@ pub struct UploadPathNarInfo {
/// The size of the NAR.
pub nar_size: usize,
}
#[serde_as]
#[derive(Debug, Serialize, Deserialize)]
pub struct UploadPathResult {
#[serde_as(deserialize_as = "DefaultOnError")]
pub kind: UploadPathResultKind,
}
#[derive(Debug, PartialEq, Eq, Serialize, Deserialize)]
#[non_exhaustive]
pub enum UploadPathResultKind {
/// The path was uploaded.
Uploaded,
/// The path was globally deduplicated.
Deduplicated,
}
impl Default for UploadPathResultKind {
fn default() -> Self {
Self::Uploaded
}
}

View file

@ -16,7 +16,7 @@ use crate::config::ServerConfig;
use crate::version::ATTIC_DISTRIBUTOR;
use attic::api::v1::cache_config::{CacheConfig, CreateCacheRequest};
use attic::api::v1::get_missing_paths::{GetMissingPathsRequest, GetMissingPathsResponse};
use attic::api::v1::upload_path::UploadPathNarInfo;
use attic::api::v1::upload_path::{UploadPathNarInfo, UploadPathResult};
use attic::cache::CacheName;
use attic::nix_store::StorePathHash;
@ -155,7 +155,11 @@ impl ApiClient {
}
/// Uploads a path.
pub async fn upload_path<S>(&self, nar_info: UploadPathNarInfo, stream: S) -> Result<()>
pub async fn upload_path<S>(
&self,
nar_info: UploadPathNarInfo,
stream: S,
) -> Result<Option<UploadPathResult>>
where
S: TryStream + Send + Sync + 'static,
S::Error: Into<Box<dyn StdError + Send + Sync>>,
@ -177,7 +181,10 @@ impl ApiClient {
.await?;
if res.status().is_success() {
Ok(())
match res.json().await {
Ok(r) => Ok(Some(r)),
Err(_) => Ok(None),
}
} else {
let api_error = ApiError::try_from_response(res).await?;
Err(api_error.into())

View file

@ -17,7 +17,7 @@ use crate::api::ApiClient;
use crate::cache::{CacheName, CacheRef};
use crate::cli::Opts;
use crate::config::Config;
use attic::api::v1::upload_path::UploadPathNarInfo;
use attic::api::v1::upload_path::{UploadPathNarInfo, UploadPathResultKind};
use attic::error::AtticResult;
use attic::nix_store::{NixStore, StorePath, StorePathHash, ValidPathInfo};
@ -131,19 +131,39 @@ pub async fn upload_path(
let start = Instant::now();
match api.upload_path(upload_info, nar_stream).await {
Ok(_) => {
let elapsed = start.elapsed();
let seconds = elapsed.as_secs_f64();
let speed = (path_info.nar_size as f64 / seconds) as u64;
Ok(r) => {
if r.is_none() {
mp.suspend(|| {
eprintln!("Warning: Please update your server. Compatibility will be removed in the first stable release.");
})
}
let deduplicated = if let Some(r) = r {
r.kind == UploadPathResultKind::Deduplicated
} else {
false
};
if deduplicated {
mp.suspend(|| {
eprintln!("{} (deduplicated)", path.as_os_str().to_string_lossy());
});
bar.finish_and_clear();
} else {
let elapsed = start.elapsed();
let seconds = elapsed.as_secs_f64();
let speed = (path_info.nar_size as f64 / seconds) as u64;
mp.suspend(|| {
eprintln!(
"✅ {} ({}/s)",
path.as_os_str().to_string_lossy(),
HumanBytes(speed)
);
});
bar.finish_and_clear();
}
mp.suspend(|| {
eprintln!(
"✅ {} ({}/s)",
path.as_os_str().to_string_lossy(),
HumanBytes(speed)
);
});
bar.finish_and_clear();
Ok(())
}
Err(e) => {

View file

@ -6,7 +6,7 @@ use std::sync::Arc;
use anyhow::anyhow;
use async_compression::tokio::bufread::{BrotliEncoder, XzEncoder, ZstdEncoder};
use axum::{
extract::{BodyStream, Extension},
extract::{BodyStream, Extension, Json},
http::HeaderMap,
};
use chrono::Utc;
@ -26,7 +26,7 @@ use crate::config::CompressionType;
use crate::error::{ErrorKind, ServerError, ServerResult};
use crate::narinfo::Compression;
use crate::{RequestState, State};
use attic::api::v1::upload_path::UploadPathNarInfo;
use attic::api::v1::upload_path::{UploadPathNarInfo, UploadPathResult, UploadPathResultKind};
use attic::hash::Hash;
use attic::stream::StreamHasher;
use attic::util::Finally;
@ -34,7 +34,7 @@ use attic::util::Finally;
use crate::database::entity::cache;
use crate::database::entity::nar::{self, Entity as Nar, NarState};
use crate::database::entity::object::{self, Entity as Object};
use crate::database::entity::Json;
use crate::database::entity::Json as DbJson;
use crate::database::{AtticDatabase, NarGuard};
type CompressorFn<C> = Box<dyn FnOnce(C) -> Box<dyn AsyncRead + Unpin + Send> + Send>;
@ -84,7 +84,7 @@ pub(crate) async fn upload_path(
Extension(req_state): Extension<RequestState>,
headers: HeaderMap,
stream: BodyStream,
) -> ServerResult<String> {
) -> ServerResult<Json<UploadPathResult>> {
let upload_info: UploadPathNarInfo = {
let header = headers
.get("X-Attic-Nar-Info")
@ -114,7 +114,16 @@ pub(crate) async fn upload_path(
match existing_nar {
Some(existing_nar) => {
// Deduplicate
upload_path_dedup(username, cache, upload_info, stream, database, &state, existing_nar).await
upload_path_dedup(
username,
cache,
upload_info,
stream,
database,
&state,
existing_nar,
)
.await
}
None => {
// New NAR
@ -132,7 +141,7 @@ async fn upload_path_dedup(
database: &DatabaseConnection,
state: &State,
existing_nar: NarGuard,
) -> ServerResult<String> {
) -> ServerResult<Json<UploadPathResult>> {
if state.config.require_proof_of_possession {
let (mut stream, nar_compute) = StreamHasher::new(stream, Sha256::new());
tokio::io::copy(&mut stream, &mut tokio::io::sink())
@ -182,8 +191,9 @@ async fn upload_path_dedup(
// Ensure it's not unlocked earlier
drop(existing_nar);
// TODO
Ok("Success".to_string())
Ok(Json(UploadPathResult {
kind: UploadPathResultKind::Deduplicated,
}))
}
/// Uploads a path when there is no matching NAR in the global cache.
@ -198,7 +208,7 @@ async fn upload_path_new(
stream: impl AsyncRead + Send + Unpin + 'static,
database: &DatabaseConnection,
state: &State,
) -> ServerResult<String> {
) -> ServerResult<Json<UploadPathResult>> {
let compression_config = &state.config.compression;
let compression: Compression = compression_config.r#type.into();
let level = compression_config.level();
@ -228,7 +238,7 @@ async fn upload_path_new(
nar_hash: Set(upload_info.nar_hash.to_typed_base16()),
nar_size: Set(nar_size_db),
remote_file: Set(Json(remote_file)),
remote_file: Set(DbJson(remote_file)),
remote_file_id: Set(remote_file_id),
created_at: Set(Utc::now()),
@ -327,8 +337,9 @@ async fn upload_path_new(
cleanup.cancel();
// TODO
Ok("Success".to_string())
Ok(Json(UploadPathResult {
kind: UploadPathResultKind::Uploaded,
}))
}
impl CompressionStream {
@ -379,9 +390,9 @@ impl UploadPathNarInfoExt for UploadPathNarInfo {
object::ActiveModel {
store_path_hash: Set(self.store_path_hash.to_string()),
store_path: Set(self.store_path.clone()),
references: Set(Json(self.references.clone())),
references: Set(DbJson(self.references.clone())),
deriver: Set(self.deriver.clone()),
sigs: Set(Json(self.sigs.clone())),
sigs: Set(DbJson(self.sigs.clone())),
ca: Set(self.ca.clone()),
..Default::default()
}