implement sharding

didn't actually tested the code
This commit is contained in:
Jörg Thalheim 2023-11-10 18:04:08 +01:00
parent b43d12082e
commit 484e38094c
2 changed files with 103 additions and 8 deletions

View file

@ -212,9 +212,9 @@ in {
${lib.optionalString (config.storage == "local") ''
with subtest("Check that all chunks are actually deleted after GC"):
files = server.succeed("find /var/lib/atticd/storage -type f")
files = server.succeed("find /var/lib/atticd/storage -type f ! -name 'VERSION'")
print(f"Remaining files: {files}")
assert files.strip() == ""
assert files.strip() == "", "Some files remain after GC: " + files
''}
with subtest("Check that we can include the upload info in the payload"):

View file

@ -1,5 +1,8 @@
//! Local file storage.
use std::ffi::OsStr;
use std::os::unix::ffi::OsStrExt;
use std::path::Path;
use std::path::PathBuf;
use async_trait::async_trait;
@ -30,17 +33,95 @@ pub struct LocalRemoteFile {
pub name: String,
}
async fn read_version(storage_path: &Path) -> ServerResult<u32> {
let version_path = storage_path.join("VERSION");
let v = match fs::read_to_string(&version_path).await {
Ok(version) => version
.trim()
.parse()
.map_err(|_| ErrorKind::StorageError(anyhow::anyhow!("Invalid version file")))?,
Err(e) if e.kind() == io::ErrorKind::NotFound => 0,
Err(e) => {
return Err(ErrorKind::StorageError(anyhow::anyhow!(
"Failed to read version file: {}",
e
))
.into());
}
};
Ok(v)
}
async fn write_version(storage_path: &Path, version: u32) -> ServerResult<()> {
let version_path = storage_path.join("VERSION");
fs::write(&version_path, format!("{}", version))
.await
.map_err(ServerError::storage_error)?;
Ok(())
}
async fn upgrade_0_to_1(storage_path: &Path) -> ServerResult<()> {
let mut files = fs::read_dir(storage_path)
.await
.map_err(ServerError::storage_error)?;
// move all files to subdirectory using the first two characters of the filename
while let Some(file) = files
.next_entry()
.await
.map_err(ServerError::storage_error)?
{
if file
.file_type()
.await
.map_err(ServerError::storage_error)?
.is_file()
{
let name = file.file_name();
let name_bytes = name.as_os_str().as_bytes();
let parents = storage_path
.join(OsStr::from_bytes(&name_bytes[0..1]))
.join(OsStr::from_bytes(&name_bytes[0..2]));
let new_path = parents.join(name);
fs::create_dir_all(&parents).await.map_err(|e| {
ErrorKind::StorageError(anyhow::anyhow!("Failed to create directory {}", e))
})?;
fs::rename(&file.path(), &new_path).await.map_err(|e| {
ErrorKind::StorageError(anyhow::anyhow!(
"Failed to move file {} to {}: {}",
file.path().display(),
new_path.display(),
e
))
})?;
}
}
Ok(())
}
impl LocalBackend {
pub async fn new(config: LocalStorageConfig) -> ServerResult<Self> {
fs::create_dir_all(&config.path)
.await
.map_err(ServerError::storage_error)?;
fs::create_dir_all(&config.path).await.map_err(|e| {
ErrorKind::StorageError(anyhow::anyhow!(
"Failed to create storage directory {}: {}",
config.path.display(),
e
))
})?;
let version = read_version(&config.path).await?;
if version == 0 {
upgrade_0_to_1(&config.path).await?;
}
write_version(&config.path, 1).await?;
Ok(Self { config })
}
fn get_path(&self, p: &str) -> PathBuf {
self.config.path.join(p)
let level1 = &p[0..1];
let level2 = &p[0..2];
self.config.path.join(level1).join(level2).join(p)
}
}
@ -51,9 +132,23 @@ impl StorageBackend for LocalBackend {
name: String,
mut stream: &mut (dyn AsyncRead + Unpin + Send),
) -> ServerResult<RemoteFile> {
let mut file = File::create(self.get_path(&name))
let path = self.get_path(&name);
fs::create_dir_all(path.parent().unwrap())
.await
.map_err(ServerError::storage_error)?;
.map_err(|e| {
ErrorKind::StorageError(anyhow::anyhow!(
"Failed to create directory {}: {}",
path.parent().unwrap().display(),
e
))
})?;
let mut file = File::create(self.get_path(&name)).await.map_err(|e| {
ErrorKind::StorageError(anyhow::anyhow!(
"Failed to create file {}: {}",
self.get_path(&name).display(),
e
))
})?;
io::copy(&mut stream, &mut file)
.await