server: Use the same read_chunk_async implementation

This commit is contained in:
Zhaofeng Li 2023-01-29 12:01:54 -07:00
parent 33d8dfabbd
commit 00c3024c41
3 changed files with 15 additions and 37 deletions

View file

@ -8,7 +8,7 @@ use std::sync::Arc;
use std::task::{Context, Poll}; use std::task::{Context, Poll};
use async_stream::try_stream; use async_stream::try_stream;
use bytes::Bytes; use bytes::{Bytes, BytesMut};
use digest::{Digest, Output as DigestOutput}; use digest::{Digest, Output as DigestOutput};
use futures::stream::{BoxStream, Stream, StreamExt}; use futures::stream::{BoxStream, Stream, StreamExt};
use tokio::io::{AsyncRead, AsyncReadExt, ReadBuf}; use tokio::io::{AsyncRead, AsyncReadExt, ReadBuf};
@ -152,32 +152,20 @@ impl<R: AsyncRead + Unpin, D: Digest + Unpin> AsyncRead for StreamHasher<R, D> {
} }
} }
/// Greedily reads from a stream for some number of bytes. /// Greedily reads from a stream to fill a buffer.
///
/// This was originally from rust-s3 but completely rewritten to resolve
/// performance problems.
pub async fn read_chunk_async<S: AsyncRead + Unpin + Send>( pub async fn read_chunk_async<S: AsyncRead + Unpin + Send>(
stream: &mut S, stream: &mut S,
max_size: usize, mut chunk: BytesMut,
) -> std::io::Result<Vec<u8>> { ) -> std::io::Result<Bytes> {
let mut chunk: Box<[u8]> = vec![0u8; max_size].into_boxed_slice(); while chunk.len() < chunk.capacity() {
let mut cursor = 0; let read = stream.read_buf(&mut chunk).await?;
while cursor < max_size {
let buf = &mut chunk[cursor..];
let read = stream.read(buf).await?;
if read == 0 { if read == 0 {
break; break;
} else {
cursor += read;
} }
} }
let mut vec = chunk.into_vec(); Ok(chunk.freeze())
vec.truncate(cursor);
Ok(vec)
} }
#[cfg(test)] #[cfg(test)]

View file

@ -7,7 +7,9 @@ use async_stream::try_stream;
use bytes::{BufMut, Bytes, BytesMut}; use bytes::{BufMut, Bytes, BytesMut};
use fastcdc::FastCDC; use fastcdc::FastCDC;
use futures::stream::Stream; use futures::stream::Stream;
use tokio::io::{AsyncRead, AsyncReadExt}; use tokio::io::AsyncRead;
use attic::stream::read_chunk_async;
/// Splits a streams into content-defined chunks. /// Splits a streams into content-defined chunks.
/// ///
@ -63,21 +65,6 @@ where
Box::pin(s) Box::pin(s)
} }
async fn read_chunk_async<S: AsyncRead + Unpin + Send>(
stream: &mut S,
mut chunk: BytesMut,
) -> std::io::Result<Bytes> {
while chunk.len() < chunk.capacity() {
let read = stream.read_buf(&mut chunk).await?;
if read == 0 {
break;
}
}
Ok(chunk.freeze())
}
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use super::*; use super::*;

View file

@ -11,6 +11,7 @@ use aws_sdk_s3::{
presigning::config::PresigningConfig, presigning::config::PresigningConfig,
Client, Credentials, Endpoint, Region, Client, Credentials, Endpoint, Region,
}; };
use bytes::BytesMut;
use futures::future::join_all; use futures::future::join_all;
use futures::stream::StreamExt; use futures::stream::StreamExt;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
@ -171,7 +172,8 @@ impl StorageBackend for S3Backend {
name: String, name: String,
mut stream: &mut (dyn AsyncRead + Unpin + Send), mut stream: &mut (dyn AsyncRead + Unpin + Send),
) -> ServerResult<RemoteFile> { ) -> ServerResult<RemoteFile> {
let first_chunk = read_chunk_async(&mut stream, CHUNK_SIZE) let buf = BytesMut::with_capacity(CHUNK_SIZE);
let first_chunk = read_chunk_async(&mut stream, buf)
.await .await
.map_err(ServerError::storage_error)?; .map_err(ServerError::storage_error)?;
@ -238,7 +240,8 @@ impl StorageBackend for S3Backend {
let chunk = if part_number == 1 { let chunk = if part_number == 1 {
first_chunk.take().unwrap() first_chunk.take().unwrap()
} else { } else {
read_chunk_async(&mut stream, CHUNK_SIZE) let buf = BytesMut::with_capacity(CHUNK_SIZE);
read_chunk_async(&mut stream, buf)
.await .await
.map_err(ServerError::storage_error)? .map_err(ServerError::storage_error)?
}; };