Merge pull request #98 from Mic92/sharding

implement sharding
This commit is contained in:
Zhaofeng Li 2024-01-01 09:17:18 -07:00 committed by GitHub
commit 9a9e2c0ce1
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
2 changed files with 103 additions and 8 deletions

View file

@ -234,9 +234,9 @@ in {
${lib.optionalString (config.storage == "local") '' ${lib.optionalString (config.storage == "local") ''
with subtest("Check that all chunks are actually deleted after GC"): with subtest("Check that all chunks are actually deleted after GC"):
files = server.succeed("find /var/lib/atticd/storage -type f") files = server.succeed("find /var/lib/atticd/storage -type f ! -name 'VERSION'")
print(f"Remaining files: {files}") print(f"Remaining files: {files}")
assert files.strip() == "" assert files.strip() == "", "Some files remain after GC: " + files
''} ''}
with subtest("Check that we can include the upload info in the payload"): with subtest("Check that we can include the upload info in the payload"):

View file

@ -1,5 +1,8 @@
//! Local file storage. //! Local file storage.
use std::ffi::OsStr;
use std::os::unix::ffi::OsStrExt;
use std::path::Path;
use std::path::PathBuf; use std::path::PathBuf;
use async_trait::async_trait; use async_trait::async_trait;
@ -30,17 +33,95 @@ pub struct LocalRemoteFile {
pub name: String, pub name: String,
} }
async fn read_version(storage_path: &Path) -> ServerResult<u32> {
let version_path = storage_path.join("VERSION");
let v = match fs::read_to_string(&version_path).await {
Ok(version) => version
.trim()
.parse()
.map_err(|_| ErrorKind::StorageError(anyhow::anyhow!("Invalid version file")))?,
Err(e) if e.kind() == io::ErrorKind::NotFound => 0,
Err(e) => {
return Err(ErrorKind::StorageError(anyhow::anyhow!(
"Failed to read version file: {}",
e
))
.into());
}
};
Ok(v)
}
async fn write_version(storage_path: &Path, version: u32) -> ServerResult<()> {
let version_path = storage_path.join("VERSION");
fs::write(&version_path, format!("{}", version))
.await
.map_err(ServerError::storage_error)?;
Ok(())
}
async fn upgrade_0_to_1(storage_path: &Path) -> ServerResult<()> {
let mut files = fs::read_dir(storage_path)
.await
.map_err(ServerError::storage_error)?;
// move all files to subdirectory using the first two characters of the filename
while let Some(file) = files
.next_entry()
.await
.map_err(ServerError::storage_error)?
{
if file
.file_type()
.await
.map_err(ServerError::storage_error)?
.is_file()
{
let name = file.file_name();
let name_bytes = name.as_os_str().as_bytes();
let parents = storage_path
.join(OsStr::from_bytes(&name_bytes[0..1]))
.join(OsStr::from_bytes(&name_bytes[0..2]));
let new_path = parents.join(name);
fs::create_dir_all(&parents).await.map_err(|e| {
ErrorKind::StorageError(anyhow::anyhow!("Failed to create directory {}", e))
})?;
fs::rename(&file.path(), &new_path).await.map_err(|e| {
ErrorKind::StorageError(anyhow::anyhow!(
"Failed to move file {} to {}: {}",
file.path().display(),
new_path.display(),
e
))
})?;
}
}
Ok(())
}
impl LocalBackend { impl LocalBackend {
pub async fn new(config: LocalStorageConfig) -> ServerResult<Self> { pub async fn new(config: LocalStorageConfig) -> ServerResult<Self> {
fs::create_dir_all(&config.path) fs::create_dir_all(&config.path).await.map_err(|e| {
.await ErrorKind::StorageError(anyhow::anyhow!(
.map_err(ServerError::storage_error)?; "Failed to create storage directory {}: {}",
config.path.display(),
e
))
})?;
let version = read_version(&config.path).await?;
if version == 0 {
upgrade_0_to_1(&config.path).await?;
}
write_version(&config.path, 1).await?;
Ok(Self { config }) Ok(Self { config })
} }
fn get_path(&self, p: &str) -> PathBuf { fn get_path(&self, p: &str) -> PathBuf {
self.config.path.join(p) let level1 = &p[0..1];
let level2 = &p[0..2];
self.config.path.join(level1).join(level2).join(p)
} }
} }
@ -51,9 +132,23 @@ impl StorageBackend for LocalBackend {
name: String, name: String,
mut stream: &mut (dyn AsyncRead + Unpin + Send), mut stream: &mut (dyn AsyncRead + Unpin + Send),
) -> ServerResult<RemoteFile> { ) -> ServerResult<RemoteFile> {
let mut file = File::create(self.get_path(&name)) let path = self.get_path(&name);
fs::create_dir_all(path.parent().unwrap())
.await .await
.map_err(ServerError::storage_error)?; .map_err(|e| {
ErrorKind::StorageError(anyhow::anyhow!(
"Failed to create directory {}: {}",
path.parent().unwrap().display(),
e
))
})?;
let mut file = File::create(self.get_path(&name)).await.map_err(|e| {
ErrorKind::StorageError(anyhow::anyhow!(
"Failed to create file {}: {}",
self.get_path(&name).display(),
e
))
})?;
io::copy(&mut stream, &mut file) io::copy(&mut stream, &mut file)
.await .await