From 8b52796dcb8e1f1620a9ed774d94286fa3f43074 Mon Sep 17 00:00:00 2001 From: Zhaofeng Li Date: Tue, 17 Jan 2023 14:10:27 -0700 Subject: [PATCH] server/upload_path: Create chunkrefs during the upload --- server/src/api/v1/upload_path.rs | 107 ++++++++++++++++++++----------- 1 file changed, 69 insertions(+), 38 deletions(-) diff --git a/server/src/api/v1/upload_path.rs b/server/src/api/v1/upload_path.rs index ab6d11d..3820d3c 100644 --- a/server/src/api/v1/upload_path.rs +++ b/server/src/api/v1/upload_path.rs @@ -296,6 +296,45 @@ async fn upload_path_new_chunked( let nar_size_db = i64::try_from(upload_info.nar_size).map_err(ServerError::request_error)?; + // Create a pending NAR entry + let nar_id = { + let model = nar::ActiveModel { + state: Set(NarState::PendingUpload), + compression: Set(compression.to_string()), + + nar_hash: Set(upload_info.nar_hash.to_typed_base16()), + nar_size: Set(nar_size_db), + + num_chunks: Set(0), + + created_at: Set(Utc::now()), + ..Default::default() + }; + + let insertion = Nar::insert(model) + .exec(database) + .await + .map_err(ServerError::database_error)?; + + insertion.last_insert_id + }; + + let cleanup = Finally::new({ + let database = database.clone(); + let nar_model = nar::ActiveModel { + id: Set(nar_id), + ..Default::default() + }; + + async move { + tracing::warn!("Error occurred - Cleaning up NAR entry"); + + if let Err(e) = Nar::delete(nar_model).exec(&database).await { + tracing::warn!("Failed to unregister failed NAR: {}", e); + } + } + }); + // FIXME: Maybe the client will send much more data than claimed let (stream, nar_compute) = StreamHasher::new(stream, Sha256::new()); let mut chunks = chunk_stream( @@ -308,6 +347,7 @@ async fn upload_path_new_chunked( let upload_chunk_limit = Arc::new(Semaphore::new(CONCURRENT_CHUNK_UPLOADS)); let mut futures = Vec::new(); + let mut chunk_idx = 0; while let Some(bytes) = chunks.next().await { let bytes = bytes.map_err(ServerError::request_error)?; let data = ChunkData::Bytes(bytes); @@ -327,15 +367,31 @@ async fn upload_path_new_chunked( data, compression_type, compression_level, - database, + database.clone(), state, require_proof_of_possession, ) .await?; + + // Create mapping from the NAR to the chunk + ChunkRef::insert(chunkref::ActiveModel { + nar_id: Set(nar_id), + seq: Set(chunk_idx as i32), + chunk_id: Set(Some(chunk.guard.id)), + chunk_hash: Set(chunk.guard.chunk_hash.clone()), + compression: Set(chunk.guard.compression.clone()), + ..Default::default() + }) + .exec(&database) + .await + .map_err(ServerError::database_error)?; + drop(permit); Ok(chunk) }) }); + + chunk_idx += 1; } // Confirm that the NAR Hash and Size are correct @@ -374,43 +430,16 @@ async fn upload_path_new_chunked( .await .map_err(ServerError::database_error)?; - // Create a NAR entry - let nar_id = { - let model = nar::ActiveModel { - state: Set(NarState::Valid), - compression: Set(compression.to_string()), - - nar_hash: Set(upload_info.nar_hash.to_typed_base16()), - nar_size: Set(nar_size_db), - - num_chunks: Set(chunks.len() as i32), - - created_at: Set(Utc::now()), - ..Default::default() - }; - - let insertion = Nar::insert(model) - .exec(&txn) - .await - .map_err(ServerError::database_error)?; - - insertion.last_insert_id - }; - - // Create mappings from the NAR to the chunks - for (i, chunk) in chunks.iter().enumerate() { - ChunkRef::insert(chunkref::ActiveModel { - nar_id: Set(nar_id), - seq: Set(i as i32), - chunk_id: Set(Some(chunk.guard.id)), - chunk_hash: Set(chunk.guard.chunk_hash.clone()), - compression: Set(chunk.guard.compression.clone()), - ..Default::default() - }) - .exec(&txn) - .await - .map_err(ServerError::database_error)?; - } + // Set num_chunks and mark the NAR as Valid + Nar::update(nar::ActiveModel { + id: Set(nar_id), + state: Set(NarState::Valid), + num_chunks: Set(chunks.len() as i32), + ..Default::default() + }) + .exec(&txn) + .await + .map_err(ServerError::database_error)?; // Create a mapping granting the local cache access to the NAR Object::delete_many() @@ -433,6 +462,8 @@ async fn upload_path_new_chunked( txn.commit().await.map_err(ServerError::database_error)?; + cleanup.cancel(); + Ok(Json(UploadPathResult { kind: UploadPathResultKind::Uploaded, file_size: Some(file_size),