server/upload_path: Create chunkrefs during the upload

This commit is contained in:
Zhaofeng Li 2023-01-17 14:10:27 -07:00
parent 5b42839f58
commit 8b52796dcb

View file

@ -296,6 +296,45 @@ async fn upload_path_new_chunked(
let nar_size_db = i64::try_from(upload_info.nar_size).map_err(ServerError::request_error)?;
// Create a pending NAR entry
let nar_id = {
let model = nar::ActiveModel {
state: Set(NarState::PendingUpload),
compression: Set(compression.to_string()),
nar_hash: Set(upload_info.nar_hash.to_typed_base16()),
nar_size: Set(nar_size_db),
num_chunks: Set(0),
created_at: Set(Utc::now()),
..Default::default()
};
let insertion = Nar::insert(model)
.exec(database)
.await
.map_err(ServerError::database_error)?;
insertion.last_insert_id
};
let cleanup = Finally::new({
let database = database.clone();
let nar_model = nar::ActiveModel {
id: Set(nar_id),
..Default::default()
};
async move {
tracing::warn!("Error occurred - Cleaning up NAR entry");
if let Err(e) = Nar::delete(nar_model).exec(&database).await {
tracing::warn!("Failed to unregister failed NAR: {}", e);
}
}
});
// FIXME: Maybe the client will send much more data than claimed
let (stream, nar_compute) = StreamHasher::new(stream, Sha256::new());
let mut chunks = chunk_stream(
@ -308,6 +347,7 @@ async fn upload_path_new_chunked(
let upload_chunk_limit = Arc::new(Semaphore::new(CONCURRENT_CHUNK_UPLOADS));
let mut futures = Vec::new();
let mut chunk_idx = 0;
while let Some(bytes) = chunks.next().await {
let bytes = bytes.map_err(ServerError::request_error)?;
let data = ChunkData::Bytes(bytes);
@ -327,15 +367,31 @@ async fn upload_path_new_chunked(
data,
compression_type,
compression_level,
database,
database.clone(),
state,
require_proof_of_possession,
)
.await?;
// Create mapping from the NAR to the chunk
ChunkRef::insert(chunkref::ActiveModel {
nar_id: Set(nar_id),
seq: Set(chunk_idx as i32),
chunk_id: Set(Some(chunk.guard.id)),
chunk_hash: Set(chunk.guard.chunk_hash.clone()),
compression: Set(chunk.guard.compression.clone()),
..Default::default()
})
.exec(&database)
.await
.map_err(ServerError::database_error)?;
drop(permit);
Ok(chunk)
})
});
chunk_idx += 1;
}
// Confirm that the NAR Hash and Size are correct
@ -374,43 +430,16 @@ async fn upload_path_new_chunked(
.await
.map_err(ServerError::database_error)?;
// Create a NAR entry
let nar_id = {
let model = nar::ActiveModel {
// Set num_chunks and mark the NAR as Valid
Nar::update(nar::ActiveModel {
id: Set(nar_id),
state: Set(NarState::Valid),
compression: Set(compression.to_string()),
nar_hash: Set(upload_info.nar_hash.to_typed_base16()),
nar_size: Set(nar_size_db),
num_chunks: Set(chunks.len() as i32),
created_at: Set(Utc::now()),
..Default::default()
};
let insertion = Nar::insert(model)
.exec(&txn)
.await
.map_err(ServerError::database_error)?;
insertion.last_insert_id
};
// Create mappings from the NAR to the chunks
for (i, chunk) in chunks.iter().enumerate() {
ChunkRef::insert(chunkref::ActiveModel {
nar_id: Set(nar_id),
seq: Set(i as i32),
chunk_id: Set(Some(chunk.guard.id)),
chunk_hash: Set(chunk.guard.chunk_hash.clone()),
compression: Set(chunk.guard.compression.clone()),
..Default::default()
})
.exec(&txn)
.await
.map_err(ServerError::database_error)?;
}
// Create a mapping granting the local cache access to the NAR
Object::delete_many()
@ -433,6 +462,8 @@ async fn upload_path_new_chunked(
txn.commit().await.map_err(ServerError::database_error)?;
cleanup.cancel();
Ok(Json(UploadPathResult {
kind: UploadPathResultKind::Uploaded,
file_size: Some(file_size),