diff --git a/attic/src/stream.rs b/attic/src/stream.rs index 21908fa..fede7b8 100644 --- a/attic/src/stream.rs +++ b/attic/src/stream.rs @@ -8,7 +8,7 @@ use std::sync::Arc; use std::task::{Context, Poll}; use async_stream::try_stream; -use bytes::Bytes; +use bytes::{Bytes, BytesMut}; use digest::{Digest, Output as DigestOutput}; use futures::stream::{BoxStream, Stream, StreamExt}; use tokio::io::{AsyncRead, AsyncReadExt, ReadBuf}; @@ -152,32 +152,20 @@ impl AsyncRead for StreamHasher { } } -/// Greedily reads from a stream for some number of bytes. -/// -/// This was originally from rust-s3 but completely rewritten to resolve -/// performance problems. +/// Greedily reads from a stream to fill a buffer. pub async fn read_chunk_async( stream: &mut S, - max_size: usize, -) -> std::io::Result> { - let mut chunk: Box<[u8]> = vec![0u8; max_size].into_boxed_slice(); - let mut cursor = 0; - - while cursor < max_size { - let buf = &mut chunk[cursor..]; - let read = stream.read(buf).await?; + mut chunk: BytesMut, +) -> std::io::Result { + while chunk.len() < chunk.capacity() { + let read = stream.read_buf(&mut chunk).await?; if read == 0 { break; - } else { - cursor += read; } } - let mut vec = chunk.into_vec(); - vec.truncate(cursor); - - Ok(vec) + Ok(chunk.freeze()) } #[cfg(test)] diff --git a/server/src/chunking/mod.rs b/server/src/chunking/mod.rs index 1cce537..1bc7f94 100644 --- a/server/src/chunking/mod.rs +++ b/server/src/chunking/mod.rs @@ -7,7 +7,9 @@ use async_stream::try_stream; use bytes::{BufMut, Bytes, BytesMut}; use fastcdc::FastCDC; use futures::stream::Stream; -use tokio::io::{AsyncRead, AsyncReadExt}; +use tokio::io::AsyncRead; + +use attic::stream::read_chunk_async; /// Splits a streams into content-defined chunks. /// @@ -63,21 +65,6 @@ where Box::pin(s) } -async fn read_chunk_async( - stream: &mut S, - mut chunk: BytesMut, -) -> std::io::Result { - while chunk.len() < chunk.capacity() { - let read = stream.read_buf(&mut chunk).await?; - - if read == 0 { - break; - } - } - - Ok(chunk.freeze()) -} - #[cfg(test)] mod tests { use super::*; diff --git a/server/src/storage/s3.rs b/server/src/storage/s3.rs index bec687b..9b08655 100644 --- a/server/src/storage/s3.rs +++ b/server/src/storage/s3.rs @@ -11,6 +11,7 @@ use aws_sdk_s3::{ presigning::config::PresigningConfig, Client, Credentials, Endpoint, Region, }; +use bytes::BytesMut; use futures::future::join_all; use futures::stream::StreamExt; use serde::{Deserialize, Serialize}; @@ -171,7 +172,8 @@ impl StorageBackend for S3Backend { name: String, mut stream: &mut (dyn AsyncRead + Unpin + Send), ) -> ServerResult { - let first_chunk = read_chunk_async(&mut stream, CHUNK_SIZE) + let buf = BytesMut::with_capacity(CHUNK_SIZE); + let first_chunk = read_chunk_async(&mut stream, buf) .await .map_err(ServerError::storage_error)?; @@ -238,7 +240,8 @@ impl StorageBackend for S3Backend { let chunk = if part_number == 1 { first_chunk.take().unwrap() } else { - read_chunk_async(&mut stream, CHUNK_SIZE) + let buf = BytesMut::with_capacity(CHUNK_SIZE); + read_chunk_async(&mut stream, buf) .await .map_err(ServerError::storage_error)? };