From 484e38094cd08d860b9db9b7461d412629ee2e18 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?J=C3=B6rg=20Thalheim?= Date: Fri, 10 Nov 2023 18:04:08 +0100 Subject: [PATCH] implement sharding didn't actually tested the code --- integration-tests/basic/default.nix | 4 +- server/src/storage/local.rs | 107 ++++++++++++++++++++++++++-- 2 files changed, 103 insertions(+), 8 deletions(-) diff --git a/integration-tests/basic/default.nix b/integration-tests/basic/default.nix index 2b029e1..337ff20 100644 --- a/integration-tests/basic/default.nix +++ b/integration-tests/basic/default.nix @@ -212,9 +212,9 @@ in { ${lib.optionalString (config.storage == "local") '' with subtest("Check that all chunks are actually deleted after GC"): - files = server.succeed("find /var/lib/atticd/storage -type f") + files = server.succeed("find /var/lib/atticd/storage -type f ! -name 'VERSION'") print(f"Remaining files: {files}") - assert files.strip() == "" + assert files.strip() == "", "Some files remain after GC: " + files ''} with subtest("Check that we can include the upload info in the payload"): diff --git a/server/src/storage/local.rs b/server/src/storage/local.rs index 93ff737..db9b88b 100644 --- a/server/src/storage/local.rs +++ b/server/src/storage/local.rs @@ -1,5 +1,8 @@ //! Local file storage. +use std::ffi::OsStr; +use std::os::unix::ffi::OsStrExt; +use std::path::Path; use std::path::PathBuf; use async_trait::async_trait; @@ -30,17 +33,95 @@ pub struct LocalRemoteFile { pub name: String, } +async fn read_version(storage_path: &Path) -> ServerResult { + let version_path = storage_path.join("VERSION"); + let v = match fs::read_to_string(&version_path).await { + Ok(version) => version + .trim() + .parse() + .map_err(|_| ErrorKind::StorageError(anyhow::anyhow!("Invalid version file")))?, + Err(e) if e.kind() == io::ErrorKind::NotFound => 0, + Err(e) => { + return Err(ErrorKind::StorageError(anyhow::anyhow!( + "Failed to read version file: {}", + e + )) + .into()); + } + }; + Ok(v) +} + +async fn write_version(storage_path: &Path, version: u32) -> ServerResult<()> { + let version_path = storage_path.join("VERSION"); + fs::write(&version_path, format!("{}", version)) + .await + .map_err(ServerError::storage_error)?; + Ok(()) +} + +async fn upgrade_0_to_1(storage_path: &Path) -> ServerResult<()> { + let mut files = fs::read_dir(storage_path) + .await + .map_err(ServerError::storage_error)?; + // move all files to subdirectory using the first two characters of the filename + while let Some(file) = files + .next_entry() + .await + .map_err(ServerError::storage_error)? + { + if file + .file_type() + .await + .map_err(ServerError::storage_error)? + .is_file() + { + let name = file.file_name(); + let name_bytes = name.as_os_str().as_bytes(); + let parents = storage_path + .join(OsStr::from_bytes(&name_bytes[0..1])) + .join(OsStr::from_bytes(&name_bytes[0..2])); + let new_path = parents.join(name); + fs::create_dir_all(&parents).await.map_err(|e| { + ErrorKind::StorageError(anyhow::anyhow!("Failed to create directory {}", e)) + })?; + fs::rename(&file.path(), &new_path).await.map_err(|e| { + ErrorKind::StorageError(anyhow::anyhow!( + "Failed to move file {} to {}: {}", + file.path().display(), + new_path.display(), + e + )) + })?; + } + } + + Ok(()) +} + impl LocalBackend { pub async fn new(config: LocalStorageConfig) -> ServerResult { - fs::create_dir_all(&config.path) - .await - .map_err(ServerError::storage_error)?; + fs::create_dir_all(&config.path).await.map_err(|e| { + ErrorKind::StorageError(anyhow::anyhow!( + "Failed to create storage directory {}: {}", + config.path.display(), + e + )) + })?; + + let version = read_version(&config.path).await?; + if version == 0 { + upgrade_0_to_1(&config.path).await?; + } + write_version(&config.path, 1).await?; Ok(Self { config }) } fn get_path(&self, p: &str) -> PathBuf { - self.config.path.join(p) + let level1 = &p[0..1]; + let level2 = &p[0..2]; + self.config.path.join(level1).join(level2).join(p) } } @@ -51,9 +132,23 @@ impl StorageBackend for LocalBackend { name: String, mut stream: &mut (dyn AsyncRead + Unpin + Send), ) -> ServerResult { - let mut file = File::create(self.get_path(&name)) + let path = self.get_path(&name); + fs::create_dir_all(path.parent().unwrap()) .await - .map_err(ServerError::storage_error)?; + .map_err(|e| { + ErrorKind::StorageError(anyhow::anyhow!( + "Failed to create directory {}: {}", + path.parent().unwrap().display(), + e + )) + })?; + let mut file = File::create(self.get_path(&name)).await.map_err(|e| { + ErrorKind::StorageError(anyhow::anyhow!( + "Failed to create file {}: {}", + self.get_path(&name).display(), + e + )) + })?; io::copy(&mut stream, &mut file) .await