Add support for chunking

This commit is contained in:
Zhaofeng Li 2023-01-14 23:55:10 -07:00
parent 93a38d1b1e
commit e8f9f3c04b
33 changed files with 2035 additions and 234 deletions

11
Cargo.lock generated
View file

@ -137,8 +137,10 @@ dependencies = [
name = "attic"
version = "0.1.0"
dependencies = [
"async-stream",
"base64 0.20.0",
"bindgen",
"bytes",
"cxx",
"cxx-build",
"digest",
@ -197,6 +199,7 @@ version = "0.1.0"
dependencies = [
"anyhow",
"async-compression",
"async-stream",
"async-trait",
"attic",
"attic-token",
@ -213,6 +216,7 @@ dependencies = [
"digest",
"displaydoc",
"enum-as-inner",
"fastcdc",
"futures",
"hex",
"humantime",
@ -229,6 +233,7 @@ dependencies = [
"serde_with",
"sha2",
"tokio",
"tokio-test",
"tokio-util",
"toml",
"tower-http",
@ -1483,6 +1488,12 @@ version = "2.5.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0206175f82b8d6bf6652ff7d71a1e27fd2e4efde587fd368662814d6ec1d9ce0"
[[package]]
name = "fastcdc"
version = "1.0.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f3f1596230ad22715a97a82deba0403ece9e4a458d008fd2511518d72a115bef"
[[package]]
name = "fastrand"
version = "1.8.0"

View file

@ -5,7 +5,9 @@ edition = "2021"
publish = false
[dependencies]
async-stream = { version = "0.3.3", optional = true }
base64 = "0.20.0"
bytes = "1.3.0"
displaydoc = "0.2.3"
digest = "0.10.6"
ed25519-compact = "2.0.4"
@ -56,4 +58,4 @@ nix_store = [ "dep:cxx", "dep:bindgen", "dep:cxx-build" ]
# Tokio.
#
# When disabled, any part depending on tokio is unavailable.
tokio = [ "dep:tokio" ]
tokio = [ "dep:tokio", "dep:async-stream" ]

View file

@ -1,13 +1,19 @@
//! Stream utilities.
use std::collections::VecDeque;
use std::future::Future;
use std::marker::Unpin;
use std::pin::Pin;
use std::sync::Arc;
use std::task::{Context, Poll};
use async_stream::try_stream;
use bytes::Bytes;
use digest::{Digest, Output as DigestOutput};
use futures::stream::{BoxStream, Stream, StreamExt};
use tokio::io::{AsyncRead, AsyncReadExt, ReadBuf};
use tokio::sync::OnceCell;
use tokio::task::spawn;
/// Stream filter that hashes the bytes that have been read.
///
@ -19,6 +25,79 @@ pub struct StreamHasher<R: AsyncRead + Unpin, D: Digest + Unpin> {
finalized: Arc<OnceCell<(DigestOutput<D>, usize)>>,
}
/// Merge chunks lazily into a continuous stream.
///
/// For each chunk, a function is called to transform it into a
/// `Stream<Item = Result<Bytes>>`. This function does something like
/// opening the local file or sending a request to S3.
///
/// We call this function some time before the start of the chunk
/// is reached to eliminate delays between chunks so the merged
/// stream is smooth. We don't want to start streaming all chunks
/// at once as it's a waste of resources.
///
/// ```text
/// | S3 GET | Chunk | S3 GET | ... | S3 GET | Chunk
/// ```
///
/// ```text
/// | S3 GET | Chunk | Chunk | Chunk | Chunk
/// | S3 GET |-----------^ ^ ^
/// | S3 GET |------| |
/// | S3 GET |--------------|
///
/// ```
///
/// TODO: Support range requests so we can have seekable NARs.
pub fn merge_chunks<C, F, S, Fut, E>(
mut chunks: VecDeque<C>,
streamer: F,
streamer_arg: S,
num_prefetch: usize,
) -> Pin<Box<impl Stream<Item = Result<Bytes, E>>>>
where
F: Fn(C, S) -> Fut,
S: Clone,
Fut: Future<Output = Result<BoxStream<'static, Result<Bytes, E>>, E>> + Send + 'static,
E: Send + 'static,
{
let s = try_stream! {
let mut streams = VecDeque::new(); // a queue of JoinHandles
// otherwise type inference gets confused :/
if false {
let chunk = chunks.pop_front().unwrap();
let stream = spawn(streamer(chunk, streamer_arg.clone()));
streams.push_back(stream);
}
loop {
if let Some(stream) = streams.pop_front() {
let mut stream = stream.await.unwrap()?;
while let Some(item) = stream.next().await {
let item = item?;
yield item;
}
}
while streams.len() < num_prefetch {
if let Some(chunk) = chunks.pop_front() {
let stream = spawn(streamer(chunk, streamer_arg.clone()));
streams.push_back(stream);
} else {
break;
}
}
if chunks.is_empty() && streams.is_empty() {
// we are done!
break;
}
}
};
Box::pin(s)
}
impl<R: AsyncRead + Unpin, D: Digest + Unpin> StreamHasher<R, D> {
pub fn new(inner: R, digest: D) -> (Self, Arc<OnceCell<(DigestOutput<D>, usize)>>) {
let finalized = Arc::new(OnceCell::new());
@ -105,6 +184,9 @@ pub async fn read_chunk_async<S: AsyncRead + Unpin + Send>(
mod tests {
use super::*;
use async_stream::stream;
use bytes::{BufMut, BytesMut};
use futures::future;
use tokio::io::AsyncReadExt;
use tokio_test::block_on;
@ -135,4 +217,45 @@ mod tests {
assert_eq!(expected.len(), *count);
eprintln!("finalized = {:x?}", finalized);
}
#[test]
fn test_merge_chunks() {
let chunk_a: BoxStream<Result<Bytes, ()>> = {
let s = stream! {
yield Ok(Bytes::from_static(b"Hello"));
};
Box::pin(s)
};
let chunk_b: BoxStream<Result<Bytes, ()>> = {
let s = stream! {
yield Ok(Bytes::from_static(b", "));
yield Ok(Bytes::from_static(b"world"));
};
Box::pin(s)
};
let chunk_c: BoxStream<Result<Bytes, ()>> = {
let s = stream! {
yield Ok(Bytes::from_static(b"!"));
};
Box::pin(s)
};
let chunks: VecDeque<BoxStream<'static, Result<Bytes, ()>>> =
[chunk_a, chunk_b, chunk_c].into_iter().collect();
let streamer = |c, _| future::ok(c);
let mut merged = merge_chunks(chunks, streamer, (), 2);
let bytes = block_on(async move {
let mut bytes = BytesMut::with_capacity(100);
while let Some(item) = merged.next().await {
bytes.put(item.unwrap());
}
bytes.freeze()
});
assert_eq!(&*bytes, b"Hello, world!");
}
}

View file

@ -126,6 +126,13 @@ in {
credentialsFile = "/etc/atticd.env";
settings = {
listen = "[::]:8080";
chunking = {
nar-size-threshold = 1;
min-size = 64 * 1024;
avg-size = 128 * 1024;
max-size = 256 * 1024;
};
};
};
@ -196,6 +203,13 @@ in {
server.succeed("${cmd.atticd} --mode garbage-collector-once")
client.fail(f"curl -sL --fail-with-body http://server:8080/test/{test_file_hash}.narinfo")
${lib.optionalString (config.storage == "local") ''
with subtest("Check that all chunks are actually deleted after GC"):
files = server.succeed("find /var/lib/atticd/storage -type f")
print(f"Remaining files: {files}")
assert files.strip() == ""
''}
with subtest("Check that we can destroy the cache"):
client.succeed("attic cache info test")
client.succeed("attic cache destroy --no-confirm test")

View file

@ -23,6 +23,7 @@ attic = { path = "../attic", default-features = false, features = [ "tokio" ] }
attic-token = { path = "../token" }
anyhow = "1.0.68"
async-stream = "0.3.3"
async-trait = "0.1.60"
aws-config = "0.52.0"
aws-sdk-s3 = "0.22.0"
@ -36,6 +37,7 @@ derivative = "2.2.0"
digest = "0.10.6"
displaydoc = "0.2.3"
enum-as-inner = "0.5.1"
fastcdc = "1.0.7"
futures = "0.3.25"
hex = "0.4.3"
humantime = "2.1.0"
@ -92,3 +94,6 @@ features = [
"rt-multi-thread",
"sync",
]
[dev-dependencies]
tokio-test = "0.4.2"

View file

@ -4,7 +4,10 @@
//!
//! The implementation is based on the specifications at <https://github.com/fzakaria/nix-http-binary-cache-api-spec>.
use std::collections::VecDeque;
use std::io::{Error as IoError, ErrorKind as IoErrorKind};
use std::path::PathBuf;
use std::sync::Arc;
use axum::{
body::StreamBody,
@ -14,19 +17,22 @@ use axum::{
routing::get,
Router,
};
use futures::stream::BoxStream;
use serde::Serialize;
use tokio_util::io::ReaderStream;
use tracing::instrument;
use crate::database::entity::chunk::ChunkModel;
use crate::database::AtticDatabase;
use crate::error::{ErrorKind, ServerResult};
use crate::narinfo::NarInfo;
use crate::nix_manifest;
use crate::storage::Download;
use crate::storage::{Download, StorageBackend};
use crate::{RequestState, State};
use attic::cache::CacheName;
use attic::mime;
use attic::nix_store::StorePathHash;
use attic::stream::merge_chunks;
/// Nix cache information.
///
@ -128,10 +134,10 @@ async fn get_store_path_info(
cache_name
);
let (object, cache, nar) = state
let (object, cache, nar, chunks) = state
.database()
.await?
.find_object_by_store_path_hash(&cache_name, &store_path_hash)
.find_object_and_chunks_by_store_path_hash(&cache_name, &store_path_hash)
.await?;
let permission = req_state
@ -141,6 +147,11 @@ async fn get_store_path_info(
req_state.set_public_cache(cache.is_public);
if chunks.iter().any(Option::is_none) {
// at least one of the chunks is missing :(
return Err(ErrorKind::IncompleteNar.into());
}
let mut narinfo = object.to_nar_info(&nar)?;
if narinfo.signature().is_none() {
@ -184,8 +195,8 @@ async fn get_nar(
let database = state.database().await?;
let (object, cache, nar) = database
.find_object_by_store_path_hash(&cache_name, &store_path_hash)
let (object, cache, _nar, chunks) = database
.find_object_and_chunks_by_store_path_hash(&cache_name, &store_path_hash)
.await?;
let permission = req_state
@ -195,18 +206,62 @@ async fn get_nar(
req_state.set_public_cache(cache.is_public);
if chunks.iter().any(Option::is_none) {
// at least one of the chunks is missing :(
return Err(ErrorKind::IncompleteNar.into());
}
database.bump_object_last_accessed(object.id).await?;
let remote_file = nar.remote_file.0;
let backend = state.storage().await?;
match backend.download_file_db(&remote_file).await? {
Download::Redirect(uri) => Ok(Redirect::temporary(&uri).into_response()),
Download::Stream(stream) => {
let stream = ReaderStream::new(stream);
let body = StreamBody::new(stream);
Ok(body.into_response())
if chunks.len() == 1 {
// single chunk
let chunk = chunks[0].as_ref().unwrap();
let remote_file = &chunk.remote_file.0;
let storage = state.storage().await?;
match storage.download_file_db(remote_file, false).await? {
Download::Url(url) => Ok(Redirect::temporary(&url).into_response()),
Download::Stream(stream) => {
let body = StreamBody::new(stream);
Ok(body.into_response())
}
Download::AsyncRead(stream) => {
let stream = ReaderStream::new(stream);
let body = StreamBody::new(stream);
Ok(body.into_response())
}
}
} else {
// reassemble NAR
fn io_error<E: std::error::Error + Send + Sync + 'static>(e: E) -> IoError {
IoError::new(IoErrorKind::Other, e)
}
let streamer = |chunk: ChunkModel, storage: Arc<Box<dyn StorageBackend + 'static>>| async move {
match storage
.download_file_db(&chunk.remote_file.0, true)
.await
.map_err(io_error)?
{
Download::Url(_) => Err(IoError::new(
IoErrorKind::Other,
"URLs not supported for NAR reassembly",
)),
Download::Stream(stream) => Ok(stream),
Download::AsyncRead(stream) => {
let stream: BoxStream<_> = Box::pin(ReaderStream::new(stream));
Ok(stream)
}
}
};
let chunks: VecDeque<_> = chunks.into_iter().map(Option::unwrap).collect();
let storage = state.storage().await?.clone();
// TODO: Make num_prefetch configurable
// The ideal size depends on the average chunk size
let merged = merge_chunks(chunks, streamer, storage, 2);
let body = StreamBody::new(merged);
Ok(body.into_response())
}
}

View file

@ -6,6 +6,7 @@ use sea_orm::{FromQueryResult, QuerySelect};
use tracing::instrument;
use crate::database::entity::cache;
use crate::database::entity::nar;
use crate::database::entity::object::{self, Entity as Object};
use crate::error::{ServerError, ServerResult};
use crate::{RequestState, State};
@ -48,8 +49,10 @@ pub(crate) async fn get_missing_paths(
.select_only()
.column_as(object::Column::StorePathHash, "store_path_hash")
.join(sea_orm::JoinType::InnerJoin, object::Relation::Cache.def())
.join(sea_orm::JoinType::InnerJoin, object::Relation::Nar.def())
.filter(cache::Column::Name.eq(payload.cache.as_str()))
.filter(object::Column::StorePathHash.is_in(query_in))
.filter(nar::Column::CompletenessHint.eq(true))
.into_model::<StorePathHashOnly>()
.all(database)
.await

View file

@ -1,23 +1,29 @@
use std::io;
use std::io::Cursor;
use std::marker::Unpin;
use std::sync::Arc;
use anyhow::anyhow;
use async_compression::tokio::bufread::{BrotliEncoder, XzEncoder, ZstdEncoder};
use async_compression::Level as CompressionLevel;
use axum::{
extract::{BodyStream, Extension, Json},
http::HeaderMap,
};
use bytes::Bytes;
use chrono::Utc;
use digest::Output as DigestOutput;
use futures::future::join_all;
use futures::StreamExt;
use sea_orm::entity::prelude::*;
use sea_orm::sea_query::Expr;
use sea_orm::ActiveValue::Set;
use sea_orm::TransactionTrait;
use sea_orm::{QuerySelect, TransactionTrait};
use sha2::{Digest, Sha256};
use tokio::io::{AsyncRead, BufReader};
use tokio::sync::OnceCell;
use tokio::io::{AsyncBufRead, AsyncRead, BufReader};
use tokio::sync::{OnceCell, Semaphore};
use tokio::task::spawn;
use tokio_util::io::StreamReader;
use tracing::instrument;
use uuid::Uuid;
@ -31,14 +37,31 @@ use attic::hash::Hash;
use attic::stream::StreamHasher;
use attic::util::Finally;
use crate::chunking::chunk_stream;
use crate::database::entity::cache;
use crate::database::entity::chunk::{self, ChunkState, Entity as Chunk};
use crate::database::entity::chunkref::{self, Entity as ChunkRef};
use crate::database::entity::nar::{self, Entity as Nar, NarState};
use crate::database::entity::object::{self, Entity as Object};
use crate::database::entity::Json as DbJson;
use crate::database::{AtticDatabase, NarGuard};
use crate::database::{AtticDatabase, ChunkGuard, NarGuard};
/// Number of chunks to upload to the storage backend at once.
///
/// TODO: Make this configurable
const CONCURRENT_CHUNK_UPLOADS: usize = 10;
type CompressorFn<C> = Box<dyn FnOnce(C) -> Box<dyn AsyncRead + Unpin + Send> + Send>;
/// Data of a chunk.
enum ChunkData {
/// Some bytes in memory.
Bytes(Bytes),
/// A stream with a user-claimed hash and size that are potentially incorrect.
Stream(Box<dyn AsyncRead + Send + Unpin + 'static>, Hash, usize),
}
/// Applies compression to a stream, computing hashes along the way.
///
/// Our strategy is to stream directly onto a UUID-keyed file on the
@ -112,18 +135,33 @@ pub(crate) async fn upload_path(
// Try to acquire a lock on an existing NAR
let existing_nar = database.find_and_lock_nar(&upload_info.nar_hash).await?;
match existing_nar {
// FIXME: existing NAR may be missing chunks
Some(existing_nar) => {
// Deduplicate
upload_path_dedup(
username,
cache,
upload_info,
stream,
database,
&state,
existing_nar,
)
.await
// Deduplicate?
let missing_chunk = ChunkRef::find()
.filter(chunkref::Column::NarId.eq(existing_nar.id))
.filter(chunkref::Column::ChunkId.is_null())
.limit(1)
.one(database)
.await
.map_err(ServerError::database_error)?;
if missing_chunk.is_some() {
// Need to repair
upload_path_new(username, cache, upload_info, stream, database, &state).await
} else {
// Can actually be deduplicated
upload_path_dedup(
username,
cache,
upload_info,
stream,
database,
&state,
existing_nar,
)
.await
}
}
None => {
// New NAR
@ -161,10 +199,6 @@ async fn upload_path_dedup(
}
}
let file_size = existing_nar.file_size
.map(|dbs| dbs.try_into().map_err(ServerError::database_error))
.transpose()?;
// Finally...
let txn = database
.begin()
@ -197,7 +231,7 @@ async fn upload_path_dedup(
Ok(Json(UploadPathResult {
kind: UploadPathResultKind::Deduplicated,
file_size,
file_size: None, // TODO: Sum the chunks
}))
}
@ -214,105 +248,232 @@ async fn upload_path_new(
database: &DatabaseConnection,
state: &State,
) -> ServerResult<Json<UploadPathResult>> {
let nar_size_threshold = state.config.chunking.nar_size_threshold;
if nar_size_threshold == 0 || upload_info.nar_size < nar_size_threshold {
upload_path_new_unchunked(username, cache, upload_info, stream, database, state).await
} else {
upload_path_new_chunked(username, cache, upload_info, stream, database, state).await
}
}
/// Uploads a path when there is no matching NAR in the global cache (chunked).
async fn upload_path_new_chunked(
username: Option<String>,
cache: cache::Model,
upload_info: UploadPathNarInfo,
stream: impl AsyncRead + Send + Unpin + 'static,
database: &DatabaseConnection,
state: &State,
) -> ServerResult<Json<UploadPathResult>> {
let chunking_config = &state.config.chunking;
let compression_config = &state.config.compression;
let compression: Compression = compression_config.r#type.into();
let level = compression_config.level();
let compressor: CompressorFn<_> = match compression_config.r#type {
CompressionType::None => Box::new(|c| Box::new(c)),
CompressionType::Brotli => {
Box::new(move |s| Box::new(BrotliEncoder::with_quality(s, level)))
}
CompressionType::Zstd => Box::new(move |s| Box::new(ZstdEncoder::with_quality(s, level))),
CompressionType::Xz => Box::new(move |s| Box::new(XzEncoder::with_quality(s, level))),
};
let compression_type = compression_config.r#type;
let compression_level = compression_config.level();
let compression: Compression = compression_type.into();
let backend = state.storage().await?;
let nar_size_db = i64::try_from(upload_info.nar_size).map_err(ServerError::request_error)?;
let key = format!("{}.nar", Uuid::new_v4());
// FIXME: Maybe the client will send much more data than claimed
let (stream, nar_compute) = StreamHasher::new(stream, Sha256::new());
let mut chunks = chunk_stream(
stream,
chunking_config.min_size,
chunking_config.avg_size,
chunking_config.max_size,
);
let remote_file = backend.make_db_reference(key.clone()).await?;
let remote_file_id = remote_file.remote_file_id();
let nar_id = {
let nar_size_db =
i64::try_from(upload_info.nar_size).map_err(ServerError::request_error)?;
let model = nar::ActiveModel {
state: Set(NarState::PendingUpload),
compression: Set(compression.to_string()),
let upload_chunk_limit = Arc::new(Semaphore::new(CONCURRENT_CHUNK_UPLOADS));
let mut futures = Vec::new();
// Untrusted data - To be confirmed later
nar_hash: Set(upload_info.nar_hash.to_typed_base16()),
nar_size: Set(nar_size_db),
while let Some(bytes) = chunks.next().await {
let bytes = bytes.map_err(ServerError::request_error)?;
let data = ChunkData::Bytes(bytes);
remote_file: Set(DbJson(remote_file)),
remote_file_id: Set(remote_file_id),
// Wait for a permit before spawning
//
// We want to block the receive process as well, otherwise it stays ahead and
// consumes too much memory
let permit = upload_chunk_limit.clone().acquire_owned().await.unwrap();
futures.push({
let database = database.clone();
let state = state.clone();
let require_proof_of_possession = state.config.require_proof_of_possession;
created_at: Set(Utc::now()),
..Default::default()
};
let insertion = Nar::insert(model)
.exec(database)
.await
.map_err(ServerError::database_error)?;
insertion.last_insert_id
};
let cleanup = Finally::new({
let database = database.clone();
let nar_model = nar::ActiveModel {
id: Set(nar_id),
..Default::default()
};
let backend = backend.clone();
let key = key.clone();
async move {
tracing::warn!("Error occurred - Cleaning up uploaded file and NAR entry");
if let Err(e) = backend.delete_file(key).await {
tracing::warn!("Failed to clean up failed upload: {}", e);
}
if let Err(e) = Nar::delete(nar_model).exec(&database).await {
tracing::warn!("Failed to unregister failed NAR: {}", e);
}
}
});
let mut stream = CompressionStream::new(stream, compressor);
// Stream the object to the storage backend
backend
.upload_file(key, stream.stream())
.await
.map_err(ServerError::storage_error)?;
spawn(async move {
let chunk = upload_chunk(
data,
compression_type,
compression_level,
database,
state,
require_proof_of_possession,
)
.await?;
drop(permit);
Ok(chunk)
})
});
}
// Confirm that the NAR Hash and Size are correct
// FIXME: errors
let (nar_hash, nar_size) = stream.nar_hash_and_size().unwrap();
let (file_hash, file_size) = stream.file_hash_and_size().unwrap();
let (nar_hash, nar_size) = nar_compute.get().unwrap();
let nar_hash = Hash::Sha256(nar_hash.as_slice().try_into().unwrap());
let file_hash = Hash::Sha256(file_hash.as_slice().try_into().unwrap());
if nar_hash != upload_info.nar_hash || *nar_size != upload_info.nar_size {
return Err(ErrorKind::RequestError(anyhow!("Bad NAR Hash or Size")).into());
}
// Wait for all uploads to complete
let chunks: Vec<ChunkGuard> = join_all(futures)
.await
.into_iter()
.map(|join_result| join_result.unwrap())
.collect::<ServerResult<Vec<_>>>()?;
let file_size = chunks
.iter()
.fold(0, |acc, c| acc + c.file_size.unwrap() as usize);
// Finally...
let txn = database
.begin()
.await
.map_err(ServerError::database_error)?;
// Update the file hash and size, and set the nar to valid
let file_size_db = i64::try_from(*file_size).map_err(ServerError::request_error)?;
Nar::update(nar::ActiveModel {
id: Set(nar_id),
state: Set(NarState::Valid),
file_hash: Set(Some(file_hash.to_typed_base16())),
file_size: Set(Some(file_size_db)),
// Create a NAR entry
let nar_id = {
let model = nar::ActiveModel {
state: Set(NarState::Valid),
compression: Set(compression.to_string()),
nar_hash: Set(upload_info.nar_hash.to_typed_base16()),
nar_size: Set(nar_size_db),
num_chunks: Set(chunks.len() as i32),
created_at: Set(Utc::now()),
..Default::default()
};
let insertion = Nar::insert(model)
.exec(&txn)
.await
.map_err(ServerError::database_error)?;
insertion.last_insert_id
};
// Create mappings from the NAR to the chunks
for (i, chunk) in chunks.iter().enumerate() {
ChunkRef::insert(chunkref::ActiveModel {
nar_id: Set(nar_id),
seq: Set(i as i32),
chunk_id: Set(Some(chunk.id)),
chunk_hash: Set(chunk.chunk_hash.clone()),
compression: Set(chunk.compression.clone()),
..Default::default()
})
.exec(&txn)
.await
.map_err(ServerError::database_error)?;
}
// Create a mapping granting the local cache access to the NAR
Object::delete_many()
.filter(object::Column::CacheId.eq(cache.id))
.filter(object::Column::StorePathHash.eq(upload_info.store_path_hash.to_string()))
.exec(&txn)
.await
.map_err(ServerError::database_error)?;
Object::insert({
let mut new_object = upload_info.to_active_model();
new_object.cache_id = Set(cache.id);
new_object.nar_id = Set(nar_id);
new_object.created_at = Set(Utc::now());
new_object.created_by = Set(username);
new_object
})
.exec(&txn)
.await
.map_err(ServerError::database_error)?;
txn.commit().await.map_err(ServerError::database_error)?;
Ok(Json(UploadPathResult {
kind: UploadPathResultKind::Uploaded,
file_size: Some(file_size),
}))
}
/// Uploads a path when there is no matching NAR in the global cache (unchunked).
///
/// We upload the entire NAR as a single chunk.
async fn upload_path_new_unchunked(
username: Option<String>,
cache: cache::Model,
upload_info: UploadPathNarInfo,
stream: impl AsyncRead + Send + Unpin + 'static,
database: &DatabaseConnection,
state: &State,
) -> ServerResult<Json<UploadPathResult>> {
let compression_config = &state.config.compression;
let compression_type = compression_config.r#type;
let compression: Compression = compression_type.into();
// Upload the entire NAR as a single chunk
let data = ChunkData::Stream(
Box::new(stream),
upload_info.nar_hash.clone(),
upload_info.nar_size,
);
let chunk = upload_chunk(
data,
compression_type,
compression_config.level(),
database.clone(),
state.clone(),
state.config.require_proof_of_possession,
)
.await?;
let file_size = chunk.file_size.unwrap() as usize;
// Finally...
let txn = database
.begin()
.await
.map_err(ServerError::database_error)?;
// Create a NAR entry
let nar_id = {
let model = nar::ActiveModel {
state: Set(NarState::Valid),
compression: Set(compression.to_string()),
nar_hash: Set(upload_info.nar_hash.to_typed_base16()),
nar_size: Set(chunk.chunk_size),
num_chunks: Set(1),
created_at: Set(Utc::now()),
..Default::default()
};
let insertion = Nar::insert(model)
.exec(&txn)
.await
.map_err(ServerError::database_error)?;
insertion.last_insert_id
};
// Create a mapping from the NAR to the chunk
ChunkRef::insert(chunkref::ActiveModel {
nar_id: Set(nar_id),
seq: Set(0),
chunk_id: Set(Some(chunk.id)),
chunk_hash: Set(upload_info.nar_hash.to_typed_base16()),
..Default::default()
})
.exec(&txn)
@ -340,15 +501,225 @@ async fn upload_path_new(
txn.commit().await.map_err(ServerError::database_error)?;
cleanup.cancel();
Ok(Json(UploadPathResult {
kind: UploadPathResultKind::Uploaded,
file_size: Some(*file_size),
file_size: Some(file_size),
}))
}
/// Uploads a chunk with the desired compression.
///
/// This will automatically perform deduplication if the chunk exists.
async fn upload_chunk(
data: ChunkData,
compression_type: CompressionType,
compression_level: CompressionLevel,
database: DatabaseConnection,
state: State,
require_proof_of_possession: bool,
) -> ServerResult<ChunkGuard> {
let compression: Compression = compression_type.into();
let given_chunk_hash = data.hash();
let given_chunk_size = data.size();
if let Some(existing_chunk) = database
.find_and_lock_chunk(&given_chunk_hash, compression)
.await?
{
// There's an existing chunk matching the hash
if require_proof_of_possession && !data.is_hash_trusted() {
let stream = data.into_async_read();
let (mut stream, nar_compute) = StreamHasher::new(stream, Sha256::new());
tokio::io::copy(&mut stream, &mut tokio::io::sink())
.await
.map_err(ServerError::request_error)?;
// FIXME: errors
let (nar_hash, nar_size) = nar_compute.get().unwrap();
let nar_hash = Hash::Sha256(nar_hash.as_slice().try_into().unwrap());
// Confirm that the NAR Hash and Size are correct
if nar_hash.to_typed_base16() != existing_chunk.chunk_hash
|| *nar_size != given_chunk_size
|| *nar_size != existing_chunk.chunk_size as usize
{
return Err(ErrorKind::RequestError(anyhow!("Bad chunk hash or size")).into());
}
}
return Ok(existing_chunk);
}
let key = format!("{}.chunk", Uuid::new_v4());
let backend = state.storage().await?;
let remote_file = backend.make_db_reference(key.clone()).await?;
let remote_file_id = remote_file.remote_file_id();
let chunk_size_db = i64::try_from(given_chunk_size).map_err(ServerError::request_error)?;
let chunk_id = {
let model = chunk::ActiveModel {
state: Set(ChunkState::PendingUpload),
compression: Set(compression.to_string()),
// Untrusted data - To be confirmed later
chunk_hash: Set(given_chunk_hash.to_typed_base16()),
chunk_size: Set(chunk_size_db),
remote_file: Set(DbJson(remote_file)),
remote_file_id: Set(remote_file_id),
created_at: Set(Utc::now()),
..Default::default()
};
let insertion = Chunk::insert(model)
.exec(&database)
.await
.map_err(ServerError::database_error)?;
insertion.last_insert_id
};
let cleanup = Finally::new({
let database = database.clone();
let chunk_model = chunk::ActiveModel {
id: Set(chunk_id),
..Default::default()
};
let backend = backend.clone();
let key = key.clone();
async move {
tracing::warn!("Error occurred - Cleaning up uploaded file and chunk entry");
if let Err(e) = backend.delete_file(key).await {
tracing::warn!("Failed to clean up failed upload: {}", e);
}
if let Err(e) = Chunk::delete(chunk_model).exec(&database).await {
tracing::warn!("Failed to unregister failed chunk: {}", e);
}
}
});
// Compress and stream to the storage backend
let compressor = get_compressor_fn(compression_type, compression_level);
let mut stream = CompressionStream::new(data.into_async_read(), compressor);
backend
.upload_file(key, stream.stream())
.await
.map_err(ServerError::storage_error)?;
// Confirm that the chunk hash is correct
let (chunk_hash, chunk_size) = stream.nar_hash_and_size().unwrap();
let (file_hash, file_size) = stream.file_hash_and_size().unwrap();
let chunk_hash = Hash::Sha256(chunk_hash.as_slice().try_into().unwrap());
let file_hash = Hash::Sha256(file_hash.as_slice().try_into().unwrap());
if chunk_hash != given_chunk_hash || *chunk_size != given_chunk_size {
return Err(ErrorKind::RequestError(anyhow!("Bad chunk hash or size")).into());
}
// Finally...
let txn = database
.begin()
.await
.map_err(ServerError::database_error)?;
// Update the file hash and size, and set the chunk to valid
let file_size_db = i64::try_from(*file_size).map_err(ServerError::request_error)?;
let chunk = Chunk::update(chunk::ActiveModel {
id: Set(chunk_id),
state: Set(ChunkState::Valid),
file_hash: Set(Some(file_hash.to_typed_base16())),
file_size: Set(Some(file_size_db)),
holders_count: Set(1),
..Default::default()
})
.exec(&txn)
.await
.map_err(ServerError::database_error)?;
// Also repair broken chunk references pointing at the same chunk
let repaired = ChunkRef::update_many()
.col_expr(chunkref::Column::ChunkId, Expr::value(chunk_id))
.filter(chunkref::Column::ChunkId.is_null())
.filter(chunkref::Column::ChunkHash.eq(chunk_hash.to_typed_base16()))
.filter(chunkref::Column::Compression.eq(compression.to_string()))
.exec(&txn)
.await
.map_err(ServerError::database_error)?;
txn.commit().await.map_err(ServerError::database_error)?;
cleanup.cancel();
tracing::debug!("Repaired {} chunkrefs", repaired.rows_affected);
let guard = ChunkGuard::from_locked(database.clone(), chunk);
Ok(guard)
}
/// Returns a compressor function that takes some stream as input.
fn get_compressor_fn<C: AsyncBufRead + Unpin + Send + 'static>(
ctype: CompressionType,
level: CompressionLevel,
) -> CompressorFn<C> {
match ctype {
CompressionType::None => Box::new(|c| Box::new(c)),
CompressionType::Brotli => {
Box::new(move |s| Box::new(BrotliEncoder::with_quality(s, level)))
}
CompressionType::Zstd => Box::new(move |s| Box::new(ZstdEncoder::with_quality(s, level))),
CompressionType::Xz => Box::new(move |s| Box::new(XzEncoder::with_quality(s, level))),
}
}
impl ChunkData {
/// Returns the potentially-incorrect hash of the chunk.
fn hash(&self) -> Hash {
match self {
Self::Bytes(bytes) => {
let mut hasher = Sha256::new();
hasher.update(bytes);
let hash = hasher.finalize();
Hash::Sha256(hash.as_slice().try_into().unwrap())
}
Self::Stream(_, hash, _) => hash.clone(),
}
}
/// Returns the potentially-incorrect size of the chunk.
fn size(&self) -> usize {
match self {
Self::Bytes(bytes) => bytes.len(),
Self::Stream(_, _, size) => *size,
}
}
/// Returns whether the hash is trusted.
fn is_hash_trusted(&self) -> bool {
matches!(self, ChunkData::Bytes(_))
}
/// Turns the data into a stream.
fn into_async_read(self) -> Box<dyn AsyncRead + Unpin + Send> {
match self {
Self::Bytes(bytes) => Box::new(Cursor::new(bytes)),
Self::Stream(stream, _, _) => stream,
}
}
}
impl CompressionStream {
/// Creates a new compression stream.
fn new<R>(stream: R, compressor: CompressorFn<BufReader<StreamHasher<R, Sha256>>>) -> Self
where
R: AsyncRead + Unpin + Send + 'static,
@ -369,6 +740,29 @@ impl CompressionStream {
}
}
/*
/// Creates a compression stream without compute the uncompressed hash/size.
///
/// This is useful if you already know the hash. `nar_hash_and_size` will
/// always return `None`.
fn new_without_nar_hash<R>(stream: R, compressor: CompressorFn<BufReader<R>>) -> Self
where
R: AsyncRead + Unpin + Send + 'static,
{
// compress NAR
let stream = compressor(BufReader::new(stream));
// compute file hash and size
let (stream, file_compute) = StreamHasher::new(stream, Sha256::new());
Self {
stream: Box::new(stream),
nar_compute: Arc::new(OnceCell::new()),
file_compute,
}
}
*/
/// Returns the stream of the compressed object.
fn stream(&mut self) -> &mut (impl AsyncRead + Unpin) {
&mut self.stream

123
server/src/chunking/mod.rs Normal file
View file

@ -0,0 +1,123 @@
//! Chunking.
//!
//! We perform chunking on uncompressed NARs using the FastCDC
//! algorithm.
use async_stream::try_stream;
use bytes::{BufMut, Bytes, BytesMut};
use fastcdc::FastCDC;
use futures::stream::Stream;
use tokio::io::{AsyncRead, AsyncReadExt};
/// Splits a streams into content-defined chunks.
///
/// This is a wrapper over fastcdc-rs that takes an `AsyncRead` and
/// returns a `Stream` of chunks as `Bytes`s.
pub fn chunk_stream<R>(
mut stream: R,
min_size: usize,
avg_size: usize,
max_size: usize,
) -> impl Stream<Item = std::io::Result<Bytes>>
where
R: AsyncRead + Unpin + Send,
{
let s = try_stream! {
let mut buf = BytesMut::with_capacity(max_size);
loop {
let read = read_chunk_async(&mut stream, buf).await?;
let mut eof = false;
if read.len() == 0 {
// Already EOF
break;
} else if read.len() < max_size {
// Last read
eof = true;
}
let chunks = FastCDC::with_eof(&read, min_size, avg_size, max_size, eof);
let mut consumed = 0;
for chunk in chunks {
consumed += chunk.length;
let slice = read.slice(chunk.offset..chunk.offset + chunk.length);
yield slice;
}
if eof {
break;
}
buf = BytesMut::with_capacity(max_size);
if consumed < read.len() {
// remaining bytes for the next read
buf.put_slice(&read[consumed..]);
}
}
};
Box::pin(s)
}
async fn read_chunk_async<S: AsyncRead + Unpin + Send>(
stream: &mut S,
mut chunk: BytesMut,
) -> std::io::Result<Bytes> {
while chunk.len() < chunk.capacity() {
let read = stream.read_buf(&mut chunk).await?;
if read == 0 {
break;
}
}
Ok(chunk.freeze())
}
#[cfg(test)]
mod tests {
use super::*;
use std::io::Cursor;
use futures::StreamExt;
use tokio_test::block_on;
/// Chunks and reconstructs a file.
#[test]
fn test_chunking_basic() {
block_on(async move {
let test_file = get_data(32 * 1024 * 1024); // 32 MiB
let mut reconstructed_file = Vec::new();
let cursor = Cursor::new(&test_file);
let mut chunks = chunk_stream(cursor, 8 * 1024, 16 * 1024, 32 * 1024);
while let Some(chunk) = chunks.next().await {
let chunk = chunk.unwrap();
eprintln!("Got a {}-byte chunk", chunk.len());
reconstructed_file.extend(chunk);
}
assert_eq!(reconstructed_file, test_file);
});
}
/// Returns some fake data.
fn get_data(len: usize) -> Vec<u8> {
let mut state = 42u32;
let mut data = vec![0u8; len];
for i in 0..data.len() {
(state, _) = state.overflowing_mul(1664525u32);
(state, _) = state.overflowing_add(1013904223u32);
data[i] = ((state >> (i % 24)) & 0xff) as u8;
}
data
}
}

View file

@ -86,6 +86,28 @@ path = "%storage_path%"
# access_key_id = ""
# secret_access_key = ""
# Data chunking
#
# Warning: If you change any of the values here, it will be
# difficult to reuse existing chunks for newly-uploaded NARs
# since the cutpoints will be different. As a result, the
# deduplication ratio will suffer for a while after the change.
[chunking]
# The minimum NAR size to trigger chunking
#
# If 0, chunking is disabled entirely for newly-uploaded NARs.
# If 1, all NARs are chunked.
nar-size-threshold = 65536 # chunk files that are 64 KiB or larger
# The preferred minimum size of a chunk, in bytes
min-size = 16384 # 16 KiB
# The preferred average size of a chunk, in bytes
avg-size = 65536 # 64 KiB
# The preferred maximum size of a chunk, in bytes
max-size = 262144 # 256 KiB
# Compression
[compression]
# Compression type

View file

@ -92,6 +92,9 @@ pub struct Config {
/// Storage.
pub storage: StorageConfig,
/// Data chunking.
pub chunking: ChunkingConfig,
/// Compression.
#[serde(default = "Default::default")]
pub compression: CompressionConfig,
@ -137,6 +140,46 @@ pub enum StorageConfig {
S3(S3StorageConfig),
}
/// Data chunking.
///
/// This must be set, but a default set of values is provided
/// through the OOBE sequence. The reason is that this allows
/// us to provide a new set of recommended "defaults" for newer
/// deployments without affecting existing ones.
///
/// Warning: If you change any of the values here, it will be
/// difficult to reuse existing chunks for newly-uploaded NARs
/// since the cutpoints will be different. As a result, the
/// deduplication ratio will suffer for a while after the change.
///
/// `atticadm test-chunking` provides a way to test chunking
/// on a set of files so you can fine-tune the values.
#[derive(Debug, Clone, Deserialize)]
pub struct ChunkingConfig {
/// The minimum NAR size to trigger chunking.
///
/// If 0, chunking is disabled entirely for newly-uploaded
/// NARs.
///
/// If 1, all newly-uploaded NARs are chunked.
///
/// By default, the threshold is 128KB.
#[serde(rename = "nar-size-threshold")]
pub nar_size_threshold: usize,
/// The preferred minimum size of a chunk, in bytes.
#[serde(rename = "min-size")]
pub min_size: usize,
/// The preferred average size of a chunk, in bytes.
#[serde(rename = "avg-size")]
pub avg_size: usize,
/// The preferred maximum size of a chunk, in bytes.
#[serde(rename = "max-size")]
pub max_size: usize,
}
/// Compression configuration.
#[derive(Debug, Clone, Deserialize)]
pub struct CompressionConfig {
@ -294,7 +337,7 @@ fn load_config_from_str(s: &str) -> Result<Config> {
/// Loads the configuration in the standard order.
pub async fn load_config(config_path: Option<&Path>, allow_oobe: bool) -> Result<Config> {
if let Some(config_path) = config_path {
load_config_from_path(&config_path)
load_config_from_path(config_path)
} else if let Ok(config_env) = env::var(ENV_CONFIG_BASE64) {
let decoded = String::from_utf8(base64::decode(config_env.as_bytes())?)?;
load_config_from_str(&decoded)

View file

@ -0,0 +1,111 @@
//! A content-addressed chunk in the global chunk store.
use sea_orm::entity::prelude::*;
use super::Json;
use crate::storage::RemoteFile;
pub type ChunkModel = Model;
/// The state of a chunk.
#[derive(EnumIter, DeriveActiveEnum, Debug, Clone, PartialEq, Eq)]
#[sea_orm(rs_type = "String", db_type = "String(Some(1))")]
pub enum ChunkState {
/// The chunk can be used.
///
/// The raw and compressed hashes are available.
#[sea_orm(string_value = "V")]
Valid,
/// The chunk is a pending upload.
///
/// The raw and compressed hashes may not be available.
#[sea_orm(string_value = "P")]
PendingUpload,
/// The chunk can be deleted because it already exists.
///
/// This state can be transitioned into from `PendingUpload`
/// if some other client completes uploading the same chunk
/// faster.
#[sea_orm(string_value = "C")]
ConfirmedDeduplicated,
/// The chunk is being deleted.
///
/// This row will be deleted shortly.
#[sea_orm(string_value = "D")]
Deleted,
}
/// A content-addressed chunk in the global cache.
#[derive(Debug, Clone, PartialEq, Eq, DeriveEntityModel)]
#[sea_orm(table_name = "chunk")]
pub struct Model {
/// Unique numeric ID of the chunk.
#[sea_orm(primary_key)]
pub id: i64,
/// The state of the chunk.
state: ChunkState,
/// The hash of the uncompressed chunk.
///
/// This always begins with "sha256:" with the hash in the
/// hexadecimal format.
///
/// The global chunk store may have several chunks with the same
/// hash:
///
/// - Racing uploads from different clients
/// - Different compression methods
#[sea_orm(indexed)]
pub chunk_hash: String,
/// The size of the uncompressed chunk.
pub chunk_size: i64,
/// The hash of the compressed chunk.
///
/// This always begins with "sha256:" with the hash in the
/// hexadecimal format.
///
/// This field may not be available if the file hashes aren't
/// confirmed.
pub file_hash: Option<String>,
/// The size of the compressed chunk.
///
/// This field may not be available if the file hashes aren't
/// confirmed.
pub file_size: Option<i64>,
/// The type of compression in use.
#[sea_orm(column_type = "String(Some(10))")]
pub compression: String,
/// The remote file backing this chunk.
pub remote_file: Json<RemoteFile>,
/// Unique string identifying the remote file.
#[sea_orm(unique)]
pub remote_file_id: String,
/// Number of processes holding this chunk.
///
/// This is for preventing garbage collection of chunks when
/// there is a pending upload that can be deduplicated and
/// there are no existing NAR references.
pub holders_count: i32,
/// Timestamp when the chunk is created.
pub created_at: ChronoDateTimeUtc,
}
#[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)]
pub enum Relation {
#[sea_orm(has_many = "super::chunkref::Entity")]
ChunkRef,
}
impl ActiveModelBehavior for ActiveModel {}

View file

@ -0,0 +1,84 @@
//! A reference binding a NAR and a chunk.
//!
//! A NAR is backed by a sequence of chunks.
//!
//! A chunk may become unavailable (e.g., disk corruption) and
//! removed from the database, in which case all dependent NARs
//! will become unavailable.
//!
//! Such scenario can be recovered from by reuploading any object
//! that has the missing chunk. `atticadm` will have the functionality
//! to kill/delete a corrupted chunk from the database and to find
//! objects with missing chunks so they can be repaired.
use sea_orm::entity::prelude::*;
pub type ChunkRefModel = Model;
/// A reference binding a NAR to a chunk.
#[derive(Debug, Clone, PartialEq, Eq, DeriveEntityModel)]
#[sea_orm(table_name = "chunkref")]
pub struct Model {
/// Unique numeric ID of the link.
#[sea_orm(primary_key)]
pub id: i64,
/// ID of the NAR.
#[sea_orm(indexed)]
pub nar_id: i64,
/// The zero-indexed sequence number of the chunk.
pub seq: i32,
/// ID of the chunk.
///
/// This may be NULL when the chunk is missing from the
/// database.
#[sea_orm(indexed)]
pub chunk_id: Option<i64>,
/// The hash of the uncompressed chunk.
///
/// This always begins with "sha256:" with the hash in the
/// hexadecimal format.
///
/// This is used for recovering from a missing chunk.
#[sea_orm(indexed)]
pub chunk_hash: String,
/// The compression of the compressed chunk.
///
/// This is used for recovering from a missing chunk.
pub compression: String,
}
#[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)]
pub enum Relation {
#[sea_orm(
belongs_to = "super::chunk::Entity",
from = "Column::ChunkId",
to = "super::chunk::Column::Id"
)]
Chunk,
#[sea_orm(
belongs_to = "super::nar::Entity",
from = "Column::NarId",
to = "super::nar::Column::Id"
)]
Nar,
}
impl Related<super::chunk::Entity> for Entity {
fn to() -> RelationDef {
Relation::Chunk.def()
}
}
impl Related<super::nar::Entity> for Entity {
fn to() -> RelationDef {
Relation::Nar.def()
}
}
impl ActiveModelBehavior for ActiveModel {}

View file

@ -3,6 +3,8 @@
//! We use SeaORM and target PostgreSQL (production) and SQLite (development).
pub mod cache;
pub mod chunk;
pub mod chunkref;
pub mod nar;
pub mod object;

View file

@ -2,9 +2,6 @@
use sea_orm::entity::prelude::*;
use super::Json;
use crate::storage::RemoteFile;
pub type NarModel = Model;
/// The state of a NAR.
@ -35,6 +32,7 @@ pub enum NarState {
/// The NAR is being deleted.
///
/// This row will be deleted shortly.
/// This variant is no longer used since the actual storage is managed as chunks.
#[sea_orm(string_value = "D")]
Deleted,
}
@ -44,6 +42,27 @@ pub enum NarState {
/// A NAR without `nix-store --export` metadata is context-free,
/// meaning that it's not associated with a store path and only
/// depends on its contents.
///
/// ## NAR Repair
///
/// After a NAR is transitioned into the `Valid` state, its list
/// of constituent chunks in `chunkref` is immutable. When a client
/// uploads an existing NAR and the NAR has unavailable chunks,
/// a new `nar` entry is created and all dependent `object` rows
/// will have the `nar_id` updated. The old `nar` entry will
/// be garbage-collected.
///
/// Why don't we just fill in the missing chunks in the existing
/// `nar`? Because the NAR stream from the client _might_ be chunked
/// differently. This is not supposed to happen since FastCDC
/// has a deterministic lookup table for cut-point judgment, however
/// we want the system to tolerate different chunking behavior because
/// of table changes, for example.
///
/// However, when a chunk is added, all broken `chunkref`s with
/// the same `chunk_hash` _are_ repaired. In other words, by
/// re-uploading a broken NAR you are helping other NARs with
/// the same broken chunk.
#[derive(Debug, Clone, PartialEq, Eq, DeriveEntityModel)]
#[sea_orm(table_name = "nar")]
pub struct Model {
@ -70,31 +89,24 @@ pub struct Model {
/// The size of the NAR archive.
pub nar_size: i64,
/// The hash of the compressed file.
///
/// This always begins with "sha256:" with the hash in the
/// hexadecimal format.
///
/// This field may not be available if the file hashes aren't
/// confirmed.
pub file_hash: Option<String>,
/// The size of the compressed file.
///
/// This field may not be available if the file hashes aren't
/// confirmed.
pub file_size: Option<i64>,
/// The type of compression in use.
#[sea_orm(column_type = "String(Some(10))")]
pub compression: String,
/// The remote file backing this NAR.
pub remote_file: Json<RemoteFile>,
/// Number of chunks that make up this NAR.
pub num_chunks: i32,
/// Unique string identifying the remote file.
#[sea_orm(unique)]
pub remote_file_id: String,
/// Hint indicating whether all chunks making up this NAR are available.
///
/// This is used by the `get-missing-paths` endpoint to
/// also return store paths that are inaccessible due to
/// missing chunks in the associated NARs. They can then be
/// repaired by any client uploading.
///
/// This flag may be outdated, but it's okay since when a client
/// tries to upload the same NAR, it will be immediately deduplicated
/// if all chunks are present and the flag will be updated.
pub completeness_hint: bool,
/// Number of processes holding this NAR.
///
@ -111,6 +123,9 @@ pub struct Model {
pub enum Relation {
#[sea_orm(has_many = "super::object::Entity")]
Object,
#[sea_orm(has_many = "super::chunkref::Entity")]
ChunkRef,
}
impl ActiveModelBehavior for ActiveModel {}

View file

@ -90,12 +90,6 @@ pub enum Relation {
impl Model {
/// Converts this object to a NarInfo.
pub fn to_nar_info(&self, nar: &NarModel) -> ServerResult<NarInfo> {
// FIXME: Return Err if file_hash and file_size don't exist
let file_size = nar
.file_size
.unwrap()
.try_into()
.map_err(ServerError::database_error)?;
let nar_size = nar
.nar_size
.try_into()
@ -106,8 +100,8 @@ impl Model {
url: format!("nar/{}.nar", self.store_path_hash.as_str()),
compression: Compression::from_str(&nar.compression)?,
file_hash: Hash::from_typed(nar.file_hash.as_ref().unwrap())?,
file_size,
file_hash: None, // FIXME
file_size: None, // FIXME
nar_hash: Hash::from_typed(&nar.nar_hash)?,
nar_size,
system: self.system.to_owned(),

View file

@ -33,12 +33,16 @@ impl MigrationTrait for Migration {
)
.col(ColumnDef::new(Column::NarHash).string().not_null())
.col(ColumnDef::new(Column::NarSize).big_integer().not_null())
.col(ColumnDef::new(Column::FileHash).string().null())
.col(ColumnDef::new(Column::FileSize).big_integer().null())
.col(ColumnDef::new(Alias::new("file_hash")).string().null())
.col(ColumnDef::new(Alias::new("file_size")).big_integer().null())
.col(ColumnDef::new(Column::Compression).string().not_null())
.col(ColumnDef::new(Column::RemoteFile).string().not_null())
.col(
ColumnDef::new(Column::RemoteFileId)
ColumnDef::new(Alias::new("remote_file"))
.string()
.not_null(),
)
.col(
ColumnDef::new(Alias::new("remote_file_id"))
.string()
.not_null()
.unique_key(),

View file

@ -0,0 +1,70 @@
use sea_orm_migration::prelude::*;
use crate::database::entity::chunk::*;
pub struct Migration;
impl MigrationName for Migration {
fn name(&self) -> &str {
"m20230112_000001_add_chunk_table"
}
}
#[async_trait::async_trait]
impl MigrationTrait for Migration {
async fn up(&self, manager: &SchemaManager) -> Result<(), DbErr> {
manager
.create_table(
Table::create()
.table(Entity)
.col(
ColumnDef::new(Column::Id)
.big_integer()
.not_null()
.auto_increment()
.primary_key(),
)
.col(
ColumnDef::new(Column::State)
.r#char()
.char_len(1)
.not_null(),
)
.col(ColumnDef::new(Column::ChunkHash).string().not_null())
.col(ColumnDef::new(Column::ChunkSize).big_integer().not_null())
.col(ColumnDef::new(Alias::new("file_hash")).string().null())
.col(ColumnDef::new(Alias::new("file_size")).big_integer().null())
.col(ColumnDef::new(Column::Compression).string().not_null())
.col(ColumnDef::new(Column::RemoteFile).string().not_null())
.col(
ColumnDef::new(Column::RemoteFileId)
.string()
.not_null()
.unique_key(),
)
.col(
ColumnDef::new(Column::HoldersCount)
.integer()
.not_null()
.default(0),
)
.col(
ColumnDef::new(Column::CreatedAt)
.timestamp_with_time_zone()
.not_null(),
)
.to_owned(),
)
.await?;
manager
.create_index(
Index::create()
.name("idx-chunk-chunk-hash")
.table(Entity)
.col(Column::ChunkHash)
.to_owned(),
)
.await
}
}

View file

@ -0,0 +1,76 @@
use sea_orm_migration::prelude::*;
use crate::database::entity::chunk;
use crate::database::entity::chunkref::*;
use crate::database::entity::nar;
pub struct Migration;
impl MigrationName for Migration {
fn name(&self) -> &str {
"m20230112_000002_add_chunkref_table"
}
}
#[async_trait::async_trait]
impl MigrationTrait for Migration {
async fn up(&self, manager: &SchemaManager) -> Result<(), DbErr> {
manager
.create_table(
Table::create()
.table(Entity)
.col(
ColumnDef::new(Column::Id)
.big_integer()
.not_null()
.auto_increment()
.primary_key(),
)
.col(ColumnDef::new(Column::NarId).big_integer().not_null())
.col(ColumnDef::new(Column::Seq).integer().not_null())
.col(ColumnDef::new(Column::ChunkId).big_integer().null())
.col(ColumnDef::new(Column::ChunkHash).string().not_null())
.col(ColumnDef::new(Column::Compression).string().not_null())
.foreign_key(
ForeignKeyCreateStatement::new()
.name("fk_chunkref_chunk")
.from_tbl(Entity)
.from_col(Column::ChunkId)
.to_tbl(chunk::Entity)
.to_col(chunk::Column::Id)
.on_delete(ForeignKeyAction::SetNull),
)
.foreign_key(
ForeignKeyCreateStatement::new()
.name("fk_chunkref_nar")
.from_tbl(Entity)
.from_col(Column::NarId)
.to_tbl(nar::Entity)
.to_col(nar::Column::Id)
.on_delete(ForeignKeyAction::Cascade),
)
.to_owned(),
)
.await?;
manager
.create_index(
Index::create()
.name("idx-chunk-nar-id")
.table(Entity)
.col(Column::NarId)
.to_owned(),
)
.await?;
manager
.create_index(
Index::create()
.name("idx-chunk-chunk-id")
.table(Entity)
.col(Column::ChunkId)
.to_owned(),
)
.await
}
}

View file

@ -0,0 +1,32 @@
use sea_orm_migration::prelude::*;
use crate::database::entity::nar::*;
pub struct Migration;
impl MigrationName for Migration {
fn name(&self) -> &str {
"m20230112_000003_add_nar_num_chunks"
}
}
#[async_trait::async_trait]
impl MigrationTrait for Migration {
async fn up(&self, manager: &SchemaManager) -> Result<(), DbErr> {
manager
.alter_table(
Table::alter()
.table(Entity)
.add_column(
ColumnDef::new(Column::NumChunks)
.integer()
.not_null()
.default(1),
)
.to_owned(),
)
.await?;
Ok(())
}
}

View file

@ -0,0 +1,143 @@
use sea_orm::{ConnectionTrait, TransactionTrait};
use sea_orm_migration::prelude::*;
use crate::database::entity::chunk;
use crate::database::entity::chunkref;
use crate::database::entity::nar;
pub struct Migration;
pub enum TempChunkCols {
/// The ID of the NAR.
NarId,
}
impl MigrationName for Migration {
fn name(&self) -> &str {
"m20230112_000004_migrate_nar_remote_files_to_chunks"
}
}
#[async_trait::async_trait]
impl MigrationTrait for Migration {
async fn up(&self, manager: &SchemaManager) -> Result<(), DbErr> {
// When this migration is run, we assume that there are no
// preexisting chunks.
eprintln!("* Migrating NARs to chunks...");
// Add a temporary column into `chunk` to store the related `nar_id`.
manager
.alter_table(
Table::alter()
.table(chunk::Entity)
.add_column_if_not_exists(
ColumnDef::new(TempChunkCols::NarId).integer().not_null(),
)
.to_owned(),
)
.await?;
// Get the original values from NARs
let select_remote_file = Query::select()
.from(nar::Entity)
.columns([
nar::Column::Id.into_iden(),
Alias::new("remote_file").into_iden(),
Alias::new("remote_file_id").into_iden(),
nar::Column::NarHash.into_iden(),
nar::Column::NarSize.into_iden(),
Alias::new("file_hash").into_iden(),
Alias::new("file_size").into_iden(),
nar::Column::Compression.into_iden(),
nar::Column::CreatedAt.into_iden(),
])
.expr_as(chunk::ChunkState::Valid, chunk::Column::State.into_iden())
.to_owned();
// ... insert them into the `chunk` table
let insert_chunk = Query::insert()
.into_table(chunk::Entity)
.columns([
TempChunkCols::NarId.into_iden(),
chunk::Column::RemoteFile.into_iden(),
chunk::Column::RemoteFileId.into_iden(),
chunk::Column::ChunkHash.into_iden(),
chunk::Column::ChunkSize.into_iden(),
chunk::Column::FileHash.into_iden(),
chunk::Column::FileSize.into_iden(),
chunk::Column::Compression.into_iden(),
chunk::Column::CreatedAt.into_iden(),
chunk::Column::State.into_iden(),
])
.select_from(select_remote_file)
.unwrap()
.returning(Query::returning().columns([
chunk::Column::Id.into_column_ref(),
TempChunkCols::NarId.into_column_ref(),
]))
.to_owned();
let insert_chunk_stmt = manager.get_database_backend().build(&insert_chunk);
// ... then create chunkrefs binding the chunks and original NARs
let select_chunk = Query::select()
.from(chunk::Entity)
.columns([
chunk::Column::Id.into_iden(),
TempChunkCols::NarId.into_iden(),
chunk::Column::ChunkHash.into_iden(),
chunk::Column::Compression.into_iden(),
])
.expr_as(0, chunkref::Column::Seq.into_iden())
.to_owned();
let insert_chunkref = Query::insert()
.into_table(chunkref::Entity)
.columns([
chunkref::Column::ChunkId.into_iden(),
chunkref::Column::NarId.into_iden(),
chunkref::Column::ChunkHash.into_iden(),
chunkref::Column::Compression.into_iden(),
chunkref::Column::Seq.into_iden(),
])
.select_from(select_chunk)
.unwrap()
.returning(Query::returning().columns([chunkref::Column::Id.into_column_ref()]))
.to_owned();
let insert_chunkref_stmt = manager.get_database_backend().build(&insert_chunkref);
// Actually run the migration
let txn = manager.get_connection().begin().await?;
txn.execute(insert_chunk_stmt).await?;
txn.execute(insert_chunkref_stmt).await?;
txn.commit().await?;
// Finally, drop the temporary column
manager
.alter_table(
Table::alter()
.table(chunk::Entity)
.drop_column(TempChunkCols::NarId)
.to_owned(),
)
.await?;
// We will drop the unused columns in `nar` in the next migration
Ok(())
}
}
impl Iden for TempChunkCols {
fn unquoted(&self, s: &mut dyn std::fmt::Write) {
write!(
s,
"{}",
match self {
Self::NarId => "temp_nar_id",
}
)
.unwrap();
}
}

View file

@ -0,0 +1,159 @@
use sea_orm::{ConnectionTrait, DatabaseBackend, Statement};
use sea_orm_migration::prelude::*;
use crate::database::entity::nar::{self, *};
pub struct Migration;
const TEMP_NAR_TABLE: &str = "nar_new";
impl MigrationName for Migration {
fn name(&self) -> &str {
"m20230112_000005_drop_old_nar_columns"
}
}
#[async_trait::async_trait]
impl MigrationTrait for Migration {
async fn up(&self, manager: &SchemaManager) -> Result<(), DbErr> {
eprintln!("* Migrating NAR schema...");
if manager.get_database_backend() == DatabaseBackend::Sqlite {
// Just copy all data to a new table
manager
.get_connection()
.execute(Statement::from_string(
manager.get_database_backend(),
"PRAGMA foreign_keys = OFF".to_owned(),
))
.await?;
manager
.create_table(
Table::create()
.table(Alias::new(TEMP_NAR_TABLE))
.if_not_exists()
.col(
ColumnDef::new(Column::Id)
.big_integer()
.not_null()
.auto_increment()
.primary_key(),
)
.col(
ColumnDef::new(Column::State)
.r#char()
.char_len(1)
.not_null(),
)
.col(ColumnDef::new(Column::NarHash).string().not_null())
.col(ColumnDef::new(Column::NarSize).big_integer().not_null())
.col(ColumnDef::new(Column::Compression).string().not_null())
.col(
ColumnDef::new(Column::NumChunks)
.integer()
.not_null()
.default(1),
)
.col(
ColumnDef::new(Column::HoldersCount)
.integer()
.not_null()
.default(0),
)
.col(
ColumnDef::new(Column::CreatedAt)
.timestamp_with_time_zone()
.not_null(),
)
.to_owned(),
)
.await?;
let columns = [
nar::Column::Id.into_iden(),
nar::Column::State.into_iden(),
nar::Column::NarHash.into_iden(),
nar::Column::NarSize.into_iden(),
nar::Column::Compression.into_iden(),
nar::Column::NumChunks.into_iden(),
nar::Column::HoldersCount.into_iden(),
nar::Column::CreatedAt.into_iden(),
];
let select_nar = Query::select()
.from(nar::Entity)
.columns(columns.clone())
.to_owned();
let insertion = Query::insert()
.into_table(Alias::new(TEMP_NAR_TABLE))
.columns(columns.clone())
.select_from(select_nar)
.unwrap()
.to_owned();
let insertion_stmt = manager.get_database_backend().build(&insertion);
manager.get_connection().execute(insertion_stmt).await?;
manager
.drop_table(Table::drop().table(nar::Entity).to_owned())
.await?;
manager
.rename_table(
Table::rename()
.table(Alias::new(TEMP_NAR_TABLE), nar::Entity)
.to_owned(),
)
.await?;
manager
.get_connection()
.execute(Statement::from_string(
manager.get_database_backend(),
"PRAGMA foreign_keys = ON".to_owned(),
))
.await?;
} else {
// Just drop the columns
manager
.alter_table(
Table::alter()
.table(nar::Entity)
.drop_column(Alias::new("file_hash"))
.to_owned(),
)
.await?;
manager
.alter_table(
Table::alter()
.table(nar::Entity)
.drop_column(Alias::new("file_size"))
.to_owned(),
)
.await?;
manager
.alter_table(
Table::alter()
.table(nar::Entity)
.drop_column(Alias::new("remote_file"))
.to_owned(),
)
.await?;
manager
.alter_table(
Table::alter()
.table(nar::Entity)
.drop_column(Alias::new("remote_file_id"))
.to_owned(),
)
.await?;
}
Ok(())
}
}

View file

@ -0,0 +1,32 @@
use sea_orm_migration::prelude::*;
use crate::database::entity::nar::*;
pub struct Migration;
impl MigrationName for Migration {
fn name(&self) -> &str {
"m20230112_000006_add_nar_completeness_hint"
}
}
#[async_trait::async_trait]
impl MigrationTrait for Migration {
async fn up(&self, manager: &SchemaManager) -> Result<(), DbErr> {
manager
.alter_table(
Table::alter()
.table(Entity)
.add_column(
ColumnDef::new(Column::CompletenessHint)
.boolean()
.not_null()
.default(true),
)
.to_owned(),
)
.await?;
Ok(())
}
}

View file

@ -8,6 +8,12 @@ mod m20221227_000003_create_object_table;
mod m20221227_000004_add_object_last_accessed;
mod m20221227_000005_add_cache_retention_period;
mod m20230103_000001_add_object_created_by;
mod m20230112_000001_add_chunk_table;
mod m20230112_000002_add_chunkref_table;
mod m20230112_000003_add_nar_num_chunks;
mod m20230112_000004_migrate_nar_remote_files_to_chunks;
mod m20230112_000005_drop_old_nar_columns;
mod m20230112_000006_add_nar_completeness_hint;
pub struct Migrator;
@ -21,6 +27,12 @@ impl MigratorTrait for Migrator {
Box::new(m20221227_000004_add_object_last_accessed::Migration),
Box::new(m20221227_000005_add_cache_retention_period::Migration),
Box::new(m20230103_000001_add_object_created_by::Migration),
Box::new(m20230112_000001_add_chunk_table::Migration),
Box::new(m20230112_000002_add_chunkref_table::Migration),
Box::new(m20230112_000003_add_nar_num_chunks::Migration),
Box::new(m20230112_000004_migrate_nar_remote_files_to_chunks::Migration),
Box::new(m20230112_000005_drop_old_nar_columns::Migration),
Box::new(m20230112_000006_add_nar_completeness_hint::Migration),
]
}
}

View file

@ -3,35 +3,43 @@ pub mod migration;
use std::ops::Deref;
use anyhow::anyhow;
use async_trait::async_trait;
use chrono::Utc;
use sea_orm::entity::prelude::*;
use sea_orm::entity::Iterable as EnumIterable;
use sea_orm::query::{JoinType, QuerySelect, QueryTrait};
use sea_orm::query::{JoinType, QueryOrder, QuerySelect, QueryTrait};
use sea_orm::sea_query::{Expr, LockBehavior, LockType, Query, Value};
use sea_orm::{ActiveValue::Set, ConnectionTrait, DatabaseConnection, FromQueryResult};
use tokio::task;
use crate::error::{ErrorKind, ServerError, ServerResult};
use crate::narinfo::Compression;
use attic::cache::CacheName;
use attic::hash::Hash;
use attic::nix_store::StorePathHash;
use entity::cache::{self, CacheModel, Entity as Cache};
use entity::chunk::{self, ChunkModel, ChunkState, Entity as Chunk};
use entity::chunkref;
use entity::nar::{self, Entity as Nar, NarModel, NarState};
use entity::object::{self, Entity as Object, ObjectModel};
// quintuple join time
const SELECT_OBJECT: &str = "O_";
const SELECT_CACHE: &str = "C_";
const SELECT_NAR: &str = "N_";
const SELECT_CHUNK: &str = "CH_";
const SELECT_CHUNKREF: &str = "CHR_";
#[async_trait]
pub trait AtticDatabase: Send + Sync {
/// Retrieves an object in a binary cache by its store path hash.
async fn find_object_by_store_path_hash(
/// Retrieves an object in a binary cache by its store path hash, returning all its
/// chunks.
async fn find_object_and_chunks_by_store_path_hash(
&self,
cache: &CacheName,
store_path_hash: &StorePathHash,
) -> ServerResult<(ObjectModel, CacheModel, NarModel)>;
) -> ServerResult<(ObjectModel, CacheModel, NarModel, Vec<Option<ChunkModel>>)>;
/// Retrieves a binary cache.
async fn find_cache(&self, cache: &CacheName) -> ServerResult<CacheModel>;
@ -39,6 +47,13 @@ pub trait AtticDatabase: Send + Sync {
/// Retrieves and locks a valid NAR matching a NAR Hash.
async fn find_and_lock_nar(&self, nar_hash: &Hash) -> ServerResult<Option<NarGuard>>;
/// Retrieves and locks a valid chunk matching a chunk Hash.
async fn find_and_lock_chunk(
&self,
chunk_hash: &Hash,
compression: Compression,
) -> ServerResult<Option<ChunkGuard>>;
/// Bumps the last accessed timestamp of an object.
async fn bump_object_last_accessed(&self, object_id: i64) -> ServerResult<()>;
}
@ -48,6 +63,11 @@ pub struct NarGuard {
nar: NarModel,
}
pub struct ChunkGuard {
database: DatabaseConnection,
chunk: ChunkModel,
}
fn prefix_column<E: EntityTrait, S: QuerySelect>(mut select: S, prefix: &str) -> S {
for col in <E::Column as EnumIterable>::iter() {
let alias = format!("{}{}", prefix, Iden::to_string(&col));
@ -57,47 +77,139 @@ fn prefix_column<E: EntityTrait, S: QuerySelect>(mut select: S, prefix: &str) ->
}
pub fn build_cache_object_nar_query() -> Select<Object> {
/*
Build something like:
-- chunkrefs must exist but chunks may not exist
select * from object
inner join cache
on object.cache_id = cache.id
inner join nar
on object.nar_id = nar.id
inner join chunkref
on chunkref.nar_id = nar.id
left join chunk
on chunkref.chunk_id = chunk.id
where
object.store_path_hash = 'fiwsv60kgwrfvib2nf9dkq9q8bk1h7qh' and
nar.state = 'V' and
cache.name = 'zhaofeng' and
cache.deleted_at is null
Returns (CacheModel, ObjectModel, NarModel, Vec<Option<ChunkModel>>)
where the number of elements in the Vec must be equal to `nar.num_chunks`.
If any element in the chunk `Vec` is `None`, it means the chunk is missing
for some reason (e.g., corrupted) and the full NAR cannot be reconstructed.
In such cases, .narinfo/.nar requests will return HTTP 503 and the affected
store paths will be treated as non-existent in `get-missing-paths` so they
can be repaired automatically when any client upload a path containing the
missing chunk.
It's a quintuple join and the query plans look reasonable on SQLite
and Postgres. For each .narinfo/.nar request, we only submit a single query.
*/
let mut query = Object::find()
.select_only()
.join(JoinType::LeftJoin, object::Relation::Cache.def())
.join(JoinType::LeftJoin, object::Relation::Nar.def());
.join(JoinType::InnerJoin, object::Relation::Cache.def())
.join(JoinType::InnerJoin, object::Relation::Nar.def())
.join(JoinType::InnerJoin, nar::Relation::ChunkRef.def())
.join(JoinType::LeftJoin, chunkref::Relation::Chunk.def())
.order_by_asc(chunkref::Column::Seq);
query = prefix_column::<object::Entity, _>(query, SELECT_OBJECT);
query = prefix_column::<cache::Entity, _>(query, SELECT_CACHE);
query = prefix_column::<nar::Entity, _>(query, SELECT_NAR);
query = prefix_column::<chunk::Entity, _>(query, SELECT_CHUNK);
query = prefix_column::<chunkref::Entity, _>(query, SELECT_CHUNKREF);
query
}
#[async_trait]
impl AtticDatabase for DatabaseConnection {
async fn find_object_by_store_path_hash(
async fn find_object_and_chunks_by_store_path_hash(
&self,
cache: &CacheName,
store_path_hash: &StorePathHash,
) -> ServerResult<(ObjectModel, CacheModel, NarModel)> {
) -> ServerResult<(ObjectModel, CacheModel, NarModel, Vec<Option<ChunkModel>>)> {
let stmt = build_cache_object_nar_query()
.filter(cache::Column::Name.eq(cache.as_str()))
.filter(cache::Column::DeletedAt.is_null())
.filter(object::Column::StorePathHash.eq(store_path_hash.as_str()))
.filter(nar::Column::State.eq(NarState::Valid))
.limit(1)
.filter(
chunk::Column::State
.eq(ChunkState::Valid)
.or(chunk::Column::State.is_null()),
)
.build(self.get_database_backend());
let result = self
.query_one(stmt)
let results = self
.query_all(stmt)
.await
.map_err(ServerError::database_error)?
.ok_or(ErrorKind::NoSuchObject)?;
let object = object::Model::from_query_result(&result, SELECT_OBJECT)
.map_err(ServerError::database_error)?;
let cache = cache::Model::from_query_result(&result, SELECT_CACHE)
.map_err(ServerError::database_error)?;
let nar = nar::Model::from_query_result(&result, SELECT_NAR)
.map_err(ServerError::database_error)?;
Ok((object, cache, nar))
if results.is_empty() {
return Err(ErrorKind::NoSuchObject.into());
}
let mut it = results.iter();
let first = it.next().unwrap();
let mut chunks = Vec::new();
let object = object::Model::from_query_result(first, SELECT_OBJECT)
.map_err(ServerError::database_error)?;
let cache = cache::Model::from_query_result(first, SELECT_CACHE)
.map_err(ServerError::database_error)?;
let nar = nar::Model::from_query_result(first, SELECT_NAR)
.map_err(ServerError::database_error)?;
if results.len() != nar.num_chunks as usize {
// Something went terribly wrong. This means there are a wrong number of `chunkref` rows.
return Err(ErrorKind::DatabaseError(anyhow!(
"Database returned the wrong number of chunks: Expected {}, got {}",
nar.num_chunks,
results.len()
))
.into());
}
chunks.push({
let chunk_id: Option<i64> = first
.try_get(SELECT_CHUNK, chunk::Column::Id.as_str())
.map_err(ServerError::database_error)?;
if chunk_id.is_some() {
Some(
chunk::Model::from_query_result(first, SELECT_CHUNK)
.map_err(ServerError::database_error)?,
)
} else {
None
}
});
for chunk in it {
chunks.push({
let chunk_id: Option<i64> = chunk
.try_get(SELECT_CHUNK, chunk::Column::Id.as_str())
.map_err(ServerError::database_error)?;
if chunk_id.is_some() {
Some(
chunk::Model::from_query_result(chunk, SELECT_CHUNK)
.map_err(ServerError::database_error)?,
)
} else {
None
}
});
}
Ok((object, cache, nar, chunks))
}
async fn find_cache(&self, cache: &CacheName) -> ServerResult<CacheModel> {
@ -143,6 +255,45 @@ impl AtticDatabase for DatabaseConnection {
Ok(guard)
}
// FIXME: Repetition
async fn find_and_lock_chunk(
&self,
chunk_hash: &Hash,
compression: Compression,
) -> ServerResult<Option<ChunkGuard>> {
let one = Value::Unsigned(Some(1));
let matched_ids = Query::select()
.from(Chunk)
.and_where(chunk::Column::ChunkHash.eq(chunk_hash.to_typed_base16()))
.and_where(chunk::Column::State.eq(ChunkState::Valid))
.and_where(chunk::Column::Compression.eq(compression.as_str()))
.expr(Expr::col(chunk::Column::Id))
.lock_with_behavior(LockType::Update, LockBehavior::SkipLocked)
.limit(1)
.to_owned();
let incr_holders = Query::update()
.table(Chunk)
.values([(
chunk::Column::HoldersCount,
Expr::col(chunk::Column::HoldersCount).add(one),
)])
.and_where(chunk::Column::Id.in_subquery(matched_ids))
.returning_all()
.to_owned();
let stmt = self.get_database_backend().build(&incr_holders);
let guard = chunk::Model::find_by_statement(stmt)
.one(self)
.await
.map_err(ServerError::database_error)?
.map(|chunk| ChunkGuard {
database: self.clone(),
chunk,
});
Ok(guard)
}
async fn bump_object_last_accessed(&self, object_id: i64) -> ServerResult<()> {
let now = Utc::now();
@ -192,3 +343,43 @@ impl Drop for NarGuard {
});
}
}
impl ChunkGuard {
pub fn from_locked(database: DatabaseConnection, chunk: ChunkModel) -> Self {
Self { database, chunk }
}
}
impl Deref for ChunkGuard {
type Target = ChunkModel;
fn deref(&self) -> &Self::Target {
&self.chunk
}
}
impl Drop for ChunkGuard {
fn drop(&mut self) {
let database = self.database.clone();
let chunk_id = self.chunk.id;
task::spawn(async move {
tracing::debug!("Unlocking chunk");
let one = Value::Unsigned(Some(1));
let decr_holders = Query::update()
.table(Chunk)
.values([(
chunk::Column::HoldersCount,
Expr::col(chunk::Column::HoldersCount).sub(one),
)])
.and_where(chunk::Column::Id.eq(chunk_id))
.to_owned();
let stmt = database.get_database_backend().build(&decr_holders);
if let Err(e) = database.execute(stmt).await {
tracing::warn!("Failed to decrement holders count: {}", e);
}
});
}
}

View file

@ -54,6 +54,9 @@ pub enum ErrorKind {
/// Invalid compression type "{name}".
InvalidCompressionType { name: String },
/// The requested NAR has missing chunks and needs to be repaired.
IncompleteNar,
/// Database error: {0}
DatabaseError(AnyError),
@ -174,6 +177,7 @@ impl ErrorKind {
Self::NoSuchCache => "NoSuchCache",
Self::CacheAlreadyExists => "CacheAlreadyExists",
Self::InvalidCompressionType { .. } => "InvalidCompressionType",
Self::IncompleteNar => "IncompleteNar",
Self::AtticError(e) => e.name(),
Self::DatabaseError(_) => "DatabaseError",
Self::StorageError(_) => "StorageError",
@ -218,6 +222,7 @@ impl ErrorKind {
Self::NoSuchCache => StatusCode::NOT_FOUND,
Self::NoSuchObject => StatusCode::NOT_FOUND,
Self::CacheAlreadyExists => StatusCode::BAD_REQUEST,
Self::IncompleteNar => StatusCode::SERVICE_UNAVAILABLE,
Self::ManifestSerializationError(_) => StatusCode::BAD_REQUEST,
Self::RequestError(_) => StatusCode::BAD_REQUEST,
Self::InvalidCompressionType { .. } => StatusCode::BAD_REQUEST,

View file

@ -17,6 +17,8 @@ use tracing::instrument;
use super::{State, StateInner};
use crate::config::Config;
use crate::database::entity::cache::{self, Entity as Cache};
use crate::database::entity::chunk::{self, ChunkState, Entity as Chunk};
use crate::database::entity::chunkref::{self, Entity as ChunkRef};
use crate::database::entity::nar::{self, Entity as Nar, NarState};
use crate::database::entity::object::{self, Entity as Object};
@ -54,6 +56,7 @@ pub async fn run_garbage_collection_once(config: Config) -> Result<()> {
let state = StateInner::new(config).await;
run_time_based_garbage_collection(&state).await?;
run_reap_orphan_nars(&state).await?;
run_reap_orphan_chunks(&state).await?;
Ok(())
}
@ -122,7 +125,6 @@ async fn run_time_based_garbage_collection(state: &State) -> Result<()> {
#[instrument(skip_all)]
async fn run_reap_orphan_nars(state: &State) -> Result<()> {
let db = state.database().await?;
let storage = state.storage().await?;
// find all orphan NARs...
let orphan_nar_ids = Query::select()
@ -140,46 +142,78 @@ async fn run_reap_orphan_nars(state: &State) -> Result<()> {
.lock_with_tables_behavior(LockType::Update, [Nar], LockBehavior::SkipLocked)
.to_owned();
// ... and simply delete them
let deletion = Nar::delete_many()
.filter(nar::Column::Id.in_subquery(orphan_nar_ids))
.exec(db)
.await?;
tracing::info!("Deleted {} orphan NARs", deletion.rows_affected,);
Ok(())
}
#[instrument(skip_all)]
async fn run_reap_orphan_chunks(state: &State) -> Result<()> {
let db = state.database().await?;
let storage = state.storage().await?;
// find all orphan chunks...
let orphan_chunk_ids = Query::select()
.from(Chunk)
.expr(chunk::Column::Id.into_expr())
.left_join(
ChunkRef,
chunkref::Column::ChunkId
.into_expr()
.eq(chunk::Column::Id.into_expr()),
)
.and_where(chunkref::Column::Id.is_null())
.and_where(chunk::Column::State.eq(ChunkState::Valid))
.and_where(chunk::Column::HoldersCount.eq(0))
.lock_with_tables_behavior(LockType::Update, [Chunk], LockBehavior::SkipLocked)
.to_owned();
// ... and transition their state to Deleted
//
// Deleted NARs are essentially invisible from our normal queries
// Deleted chunks are essentially invisible from our normal queries
let change_state = Query::update()
.table(Nar)
.value(nar::Column::State, NarState::Deleted)
.and_where(nar::Column::Id.in_subquery(orphan_nar_ids))
.table(Chunk)
.value(chunk::Column::State, ChunkState::Deleted)
.and_where(chunk::Column::Id.in_subquery(orphan_chunk_ids))
.returning_all()
.to_owned();
let stmt = db.get_database_backend().build(&change_state);
let orphan_nars = nar::Model::find_by_statement(stmt).all(db).await?;
let orphan_chunks = chunk::Model::find_by_statement(stmt).all(db).await?;
if orphan_nars.is_empty() {
if orphan_chunks.is_empty() {
return Ok(());
}
// Delete the NARs from remote storage
// Delete the chunks from remote storage
let delete_limit = Arc::new(Semaphore::new(20)); // TODO: Make this configurable
let futures: Vec<_> = orphan_nars
let futures: Vec<_> = orphan_chunks
.into_iter()
.map(|nar| {
.map(|chunk| {
let delete_limit = delete_limit.clone();
async move {
let permit = delete_limit.acquire().await?;
storage.delete_file_db(&nar.remote_file.0).await?;
storage.delete_file_db(&chunk.remote_file.0).await?;
drop(permit);
Result::<_, anyhow::Error>::Ok(nar.id)
Result::<_, anyhow::Error>::Ok(chunk.id)
}
})
.collect();
// Deletions can result in spurious failures, tolerate them
//
// NARs that failed to be deleted from the remote storage will
// Chunks that failed to be deleted from the remote storage will
// just be stuck in Deleted state.
//
// TODO: Maybe have an interactive command to retry deletions?
let deleted_nar_ids: Vec<_> = join_all(futures)
let deleted_chunk_ids: Vec<_> = join_all(futures)
.await
.into_iter()
.filter(|r| {
@ -193,12 +227,12 @@ async fn run_reap_orphan_nars(state: &State) -> Result<()> {
.collect();
// Finally, delete them from the database
let deletion = Nar::delete_many()
.filter(nar::Column::Id.is_in(deleted_nar_ids))
let deletion = Chunk::delete_many()
.filter(chunk::Column::Id.is_in(deleted_chunk_ids))
.exec(db)
.await?;
tracing::info!("Deleted {} NARs", deletion.rows_affected);
tracing::info!("Deleted {} orphan chunks", deletion.rows_affected);
Ok(())
}

View file

@ -15,9 +15,10 @@
pub mod access;
mod api;
mod chunking;
pub mod config;
pub mod database;
mod error;
pub mod error;
pub mod gc;
mod middleware;
mod narinfo;

View file

@ -86,12 +86,18 @@ pub struct NarInfo {
pub compression: Compression,
/// The hash of the compressed file.
///
/// We don't know the file hash if it's chunked.
#[serde(rename = "FileHash")]
pub file_hash: Hash,
#[serde(skip_serializing_if = "Option::is_none")]
pub file_hash: Option<Hash>,
/// The size of the compressed file.
///
/// We may not know the file size if it's chunked.
#[serde(rename = "FileSize")]
pub file_size: usize,
#[serde(skip_serializing_if = "Option::is_none")]
pub file_size: Option<usize>,
/// The hash of the NAR archive.
///
@ -242,6 +248,18 @@ impl IntoResponse for NarInfo {
}
}
impl Compression {
pub fn as_str(&self) -> &'static str {
match self {
Self::None => "none",
Self::Xz => "xz",
Self::Bzip2 => "bzip2",
Self::Brotli => "br",
Self::Zstd => "zstd",
}
}
}
impl FromStr for Compression {
type Err = ServerError;
@ -262,13 +280,7 @@ impl FromStr for Compression {
impl ToString for Compression {
fn to_string(&self) -> String {
String::from(match self {
Self::None => "none",
Self::Xz => "xz",
Self::Bzip2 => "bzip2",
Self::Brotli => "br",
Self::Zstd => "zstd",
})
String::from(self.as_str())
}
}

View file

@ -34,9 +34,9 @@ Sig: cache.nixos.org-1:lo9EfNIL4eGRuNh7DTbAAffWPpI2SlYC/8uP7JnhgmfRIUNGhSbFe8qEa
assert_eq!(Compression::Xz, narinfo.compression);
assert_eq!(
"sha256:0nqgf15qfiacfxrgm2wkw0gwwncjqqzzalj8rs14w9srkydkjsk9",
narinfo.file_hash.to_typed_base32()
narinfo.file_hash.as_ref().unwrap().to_typed_base32()
);
assert_eq!(41104, narinfo.file_size);
assert_eq!(Some(41104), narinfo.file_size);
assert_eq!(
"sha256:16mvl7v0ylzcg2n3xzjn41qhzbmgcn5iyarx16nn5l2r36n2kqci",
narinfo.nar_hash.to_typed_base32()

View file

@ -87,15 +87,19 @@ impl StorageBackend for LocalBackend {
Ok(())
}
async fn download_file(&self, name: String) -> ServerResult<Download> {
async fn download_file(&self, name: String, _prefer_stream: bool) -> ServerResult<Download> {
let file = File::open(self.get_path(&name))
.await
.map_err(ServerError::storage_error)?;
Ok(Download::Stream(Box::new(file)))
Ok(Download::AsyncRead(Box::new(file)))
}
async fn download_file_db(&self, file: &RemoteFile) -> ServerResult<Download> {
async fn download_file_db(
&self,
file: &RemoteFile,
_prefer_stream: bool,
) -> ServerResult<Download> {
let file = if let RemoteFile::Local(file) = file {
file
} else {
@ -109,7 +113,7 @@ impl StorageBackend for LocalBackend {
.await
.map_err(ServerError::storage_error)?;
Ok(Download::Stream(Box::new(file)))
Ok(Download::AsyncRead(Box::new(file)))
}
async fn make_db_reference(&self, name: String) -> ServerResult<RemoteFile> {

View file

@ -3,6 +3,8 @@
mod local;
mod s3;
use bytes::Bytes;
use futures::stream::BoxStream;
use serde::{Deserialize, Serialize};
use tokio::io::AsyncRead;
@ -33,11 +35,14 @@ pub enum RemoteFile {
/// Way to download a file.
pub enum Download {
/// A redirect to a (possibly ephemeral) URL.
Redirect(String),
/// A possibly ephemeral URL.
Url(String),
/// A stream.
Stream(Box<dyn AsyncRead + Unpin + Send>),
Stream(BoxStream<'static, std::io::Result<Bytes>>),
/// An AsyncRead.
AsyncRead(Box<dyn AsyncRead + Unpin + Send>),
}
// TODO: Maybe make RemoteFile the one true reference instead of having two sets of APIs?
@ -58,10 +63,14 @@ pub trait StorageBackend: Send + Sync + std::fmt::Debug {
async fn delete_file_db(&self, file: &RemoteFile) -> ServerResult<()>;
/// Downloads a file using the current configuration.
async fn download_file(&self, name: String) -> ServerResult<Download>;
async fn download_file(&self, name: String, prefer_stream: bool) -> ServerResult<Download>;
/// Downloads a file using a database reference.
async fn download_file_db(&self, file: &RemoteFile) -> ServerResult<Download>;
async fn download_file_db(
&self,
file: &RemoteFile,
prefer_stream: bool,
) -> ServerResult<Download>;
/// Creates a database reference for a file.
async fn make_db_reference(&self, name: String) -> ServerResult<RemoteFile>;

View file

@ -1,13 +1,18 @@
//! S3 remote files.
use std::io::{Error as IoError, ErrorKind as IoErrorKind};
use std::time::Duration;
use async_trait::async_trait;
use aws_sdk_s3::{
config::Builder as S3ConfigBuilder, model::CompletedMultipartUpload, model::CompletedPart,
presigning::config::PresigningConfig, Client, Credentials, Endpoint, Region,
client::fluent_builders::GetObject,
config::Builder as S3ConfigBuilder,
model::{CompletedMultipartUpload, CompletedPart},
presigning::config::PresigningConfig,
Client, Credentials, Endpoint, Region,
};
use futures::future::join_all;
use futures::stream::StreamExt;
use serde::{Deserialize, Serialize};
use tokio::io::AsyncRead;
@ -134,6 +139,29 @@ impl S3Backend {
Ok((client, file))
}
async fn get_download(&self, req: GetObject, prefer_stream: bool) -> ServerResult<Download> {
if prefer_stream {
let output = req.send().await.map_err(ServerError::storage_error)?;
let stream = StreamExt::map(output.body, |item| {
item.map_err(|e| IoError::new(IoErrorKind::Other, e))
});
Ok(Download::Stream(Box::pin(stream)))
} else {
// FIXME: Configurable expiration
let presign_config = PresigningConfig::expires_in(Duration::from_secs(600))
.map_err(ServerError::storage_error)?;
let presigned = req
.presigned(presign_config)
.await
.map_err(ServerError::storage_error)?;
Ok(Download::Url(presigned.uri().to_string()))
}
}
}
#[async_trait]
@ -313,38 +341,26 @@ impl StorageBackend for S3Backend {
Ok(())
}
async fn download_file(&self, name: String) -> ServerResult<Download> {
// FIXME: Configurable expiration
let presign_config = PresigningConfig::expires_in(Duration::from_secs(10))
.map_err(ServerError::storage_error)?;
let presigned = self
async fn download_file(&self, name: String, prefer_stream: bool) -> ServerResult<Download> {
let req = self
.client
.get_object()
.bucket(&self.config.bucket)
.key(&name)
.presigned(presign_config)
.await
.map_err(ServerError::storage_error)?;
.key(&name);
Ok(Download::Redirect(presigned.uri().to_string()))
self.get_download(req, prefer_stream).await
}
async fn download_file_db(&self, file: &RemoteFile) -> ServerResult<Download> {
async fn download_file_db(
&self,
file: &RemoteFile,
prefer_stream: bool,
) -> ServerResult<Download> {
let (client, file) = self.get_client_from_db_ref(file).await?;
let presign_config = PresigningConfig::expires_in(Duration::from_secs(600))
.map_err(ServerError::storage_error)?;
let req = client.get_object().bucket(&file.bucket).key(&file.key);
let presigned = client
.get_object()
.bucket(&file.bucket)
.key(&file.key)
.presigned(presign_config)
.await
.map_err(ServerError::storage_error)?;
Ok(Download::Redirect(presigned.uri().to_string()))
self.get_download(req, prefer_stream).await
}
async fn make_db_reference(&self, name: String) -> ServerResult<RemoteFile> {