是否可以从文件创建流而不是将文件内容加载到内存中?

dal*_*ook 4 rust

我目前正在使用rusoto_s3lib 将文件上传到 S3。我发现的所有示例都做同样的事情:打开一个文件,将文件的全部内容读入内存(Vec<u8>),然后将 Vec 转换为 a ByteStream(实现From<Vec<u8>>)。这是一个代码示例:

fn upload_file(&self, file_path: &Path) -> FileResult<PutObjectOutput> {
    let mut file = File::open(file_path)?;
    let mut file_data: Vec<u8> = vec![];
    file.read_to_end(&mut file_data)?;

    let client = S3Client::new(Region::UsEast1);
    let mut request = PutObjectRequest::default();
    request.body = Some(file_data.into());

    Ok(client.put_object(request).sync()?)
}
Run Code Online (Sandbox Code Playgroud)

这对于小文件来说可能是可以接受的,但是(我假设)一旦您尝试上传大小大于可用堆内存的文件,这种技术就会崩溃。

创建 a 的另一种方法ByteStream使用这个初始化器,它接受一个实现Streamtrait的对象。我认为这File会实现这个特性,但事实并非如此。

我的问题:

是否有某种类型可以从一个Filewhich 实现Stream?制作我自己的元组结构的正确解决方案是包装File和实现Stream自身吗,这个实现是微不足道的吗?有没有我没有看到的另一个解决方案,或者我只是误解了上面代码中的内存分配方式?

edw*_*rdw 5

是否有某种类型可以从实现流的文件构造?

不,不幸的是。没有内置的东西stdfutures或者现在tokio可以直接做到这一点。

由于 Stream 项的“分离”性质,这样的实现必须为传入数据的每个切片分配一个新的拥有缓冲区,并将其移交给调用者。那效率不会很高。直到 Rust 语言具有泛型关联类型(GAT),希望明年能实现,我们才能令人满意地解决这个问题。查看这张futures-rsNiko 的异步采访 #2了解更多细节。

话虽如此,现在有一些用例Stream在底层 IO 之上的外观是可取的并且足够好。

制作我自己的元组结构的正确解决方案是包装 File 并实现 Stream 本身,这个实现是微不足道的吗?

对于futures-0.1rusoto依赖,也有实现这个几个方面:

  • Stream为包装一个的结构实现特征Read
  • 利用futures效用函数,例如futures::stream::poll_fn
  • tokio-codec-0.1有一个FramedRead已经实施的优秀Stream

第三种肯定是最简单的:

use futures::stream::Stream;  // futures = "0.1.29"
use rusoto_core::{ByteStream, Region};  // rusoto_core = "0.42.0"
use rusoto_s3::{PutObjectOutput, PutObjectRequest, S3Client, S3};  // rusoto_s3 = "0.42.0"
use std::{error::Error, fs::File, path::Path};
use tokio_codec::{BytesCodec, FramedRead};  // tokio-codec = "0.1.1"
use tokio_io::io::AllowStdIo;  // tokio-io = "0.1.12"

fn upload_file(file_path: &Path) -> Result<PutObjectOutput, Box<dyn Error>> {
    let file = File::open(file_path)?;
    let aio = AllowStdIo::new(file);
    let stream = FramedRead::new(aio, BytesCodec::new()).map(|bs| bs.freeze());

    let client = S3Client::new(Region::UsEast1);
    let mut request = PutObjectRequest::default();
    request.body = Some(ByteStream::new(stream));

    Ok(client.put_object(request).sync()?)
}
Run Code Online (Sandbox Code Playgroud)