使用 rusoto 流式上传到 s3

dir*_*lik 8 rust rusoto

如何使用rusoto将文件上传到 s3 ,而不将文件内容读取到内存(流式传输)?


使用此代码:

use std::fs::File;
use std::io::BufReader;

use rusoto_core::Region;
use rusoto_s3::{PutObjectRequest, S3, S3Client, StreamingBody};

fn main() {
    let file = File::open("input.txt").unwrap();
    let mut reader = BufReader::new(file);

    let s3_client = S3Client::new(Region::UsEast1);
    let result = s3_client.put_object(PutObjectRequest {
        bucket: String::from("example_bucket"),
        key: "example_filename".to_string(),
//        this works:
//      body: Some("example string".to_owned().into_bytes().into()),
//        this doesn't:
        body: Some(StreamingBody::new(reader)),
        ..Default::default()
    }).sync().expect("could not upload");
}
Run Code Online (Sandbox Code Playgroud)

我收到以下错误:

error[E0277]: the trait bound `std::io::BufReader<std::fs::File>: futures::stream::Stream` is not satisfied
  --> src/bin/example.rs:18:20
   |
18 |         body: Some(StreamingBody::new(reader)),
   |                    ^^^^^^^^^^^^^^^^^^ the trait `futures::stream::Stream` is not implemented for `std::io::BufReader<std::fs::File>`
   |
   = note: required by `rusoto_core::stream::ByteStream::new`
Run Code Online (Sandbox Code Playgroud)

Séb*_*uld 20

好的。把自己绑起来,这是一个有趣的。

StreamingBody是 的别名ByteStream,它本身采用参数类型S: Stream<Item = Bytes, Error = Error> + Send + 'static。简而言之,它需要是一个字节流。

BufReader,显然,没有实现这个特性,因为它早于期货和流很长一段时间。也没有简单的转换Stream<Item = Bytes>可以用来隐式转换为 this。

第一个(注释)示例之所以有效,是因为String::into_bytes().into()将遵循类型转换链:String-> Vec<u8>->ByteStream感谢From<Vec<u8>>on的实现ByteStream

现在我们知道为什么这不起作用,我们可以修复它。有一个快速的方法,然后有一个正确的方法。我会告诉你两个。

快捷方式

快速(但不是最佳)的方法是简单地调用File::read_to_end(). 这将填充一个Vec<u8>,然后您可以像以前一样使用它:

 let mut buf:Vec<u8> = vec![];
 file.read_to_end(&mut buf)?;
 // buf now contains the entire file
Run Code Online (Sandbox Code Playgroud)

由于两个原因,这是低效和次优的:

  • read_to_end()是阻塞调用。根据您从何处读取文件,此阻塞时间可能被证明是不合理的
  • 您需要拥有比文件中的字节数更多的可用 RAM(+ 64 位或 128 位用于Vec定义+ 一些我们并不真正关心的额外内容)

好办法

好方法将您的文件转换为实现AsyncRead. 由此,我们可以形成一个Stream.

由于您已经有了std::fs::File,我们将首先将其转换为tokio::fs::File。这实现了AsyncRead,这对于以后非常重要:

let tokio_file = tokio::fs::File::from_std(file);
Run Code Online (Sandbox Code Playgroud)

从此,我们遗憾地需要做一些管道工作才能将其放入Stream. 多个板条箱已经实现了它;从头开始的方法如下:

use tokio_util::codec;
let byte_stream = codec::FramedRead::new(tokio_file, codec::BytesCodec::new())
   .map(|r| r.as_ref().to_vec());
Run Code Online (Sandbox Code Playgroud)

byte_stream是的一个实例,tokio_util::codec::FramedRead其中工具Stream基于我们的解码器与特定项目。正如我们的解码器一样BytesCodec,因此您的流是Stream<Item = BytesMut>.

由于操场不知道rusoto_core,我无法向您展示完整流程。但是,我可以向您展示您可以生成一个Stream<Item = Vec<u8>, Error = io::Error>,这是其中的关键:https : //play.rust-lang.org/? version = stable & mode = debug & edition = 2018 & gist =38e4ae8be0d70abd134b5331d6bf4133


bra*_*orm 6

这是一个带有即将到来的 Rusoto async-await 语法的版本(对于 getObject 虽然应该很容易调整上传)......可能会在 Rusoto 0.4.3 中供公众使用:

https://github.com/brainstorm/rusoto-s3-async-await

即:

pub async fn bucket_obj_bytes(client: S3Client, bucket: String, _prefix: String, object: String) {
    let get_req = GetObjectRequest {
        bucket,
        key: object,
        ..Default::default()
    };

    let result = client
        .get_object(get_req)
        .await
        .expect("Couldn't GET object");
    println!("get object result: {:#?}", result);

    let stream = result.body.unwrap();
    let body = stream.map_ok(|b| BytesMut::from(&b[..])).try_concat().await.unwrap();

    assert!(body.len() > 0);
    dbg!(body);
}
Run Code Online (Sandbox Code Playgroud)

这实质上是从集成测试套件本身借来的,您也可以在其中找到上传版本的片段