如何通过读取和转换文件来创建流?

use*_*732 4 rust hyper rust-tokio

我正在尝试读取文件、解密它并返回数据。因为文件可能非常大,所以我想在流中执行此操作。

我找不到一个好的模式来实现流。我正在尝试做这样的事情:

let stream = stream::unfold(decrypted_init_length, |decrypted_length| async move {
    if decrypted_length < start + length {
        let mut encrypted_chunk = vec![0u8; encrypted_block_size];
        match f.read(&mut encrypted_chunk[..]) {
            Ok(size) => {
                if size > 0 {
                    let decrypted = my_decrypt_fn(&encrypted_chunk[..]);
                    let updated_decrypted_length = decrypted_length + decrypted.len();
                    Some((decrypted, updated_decrypted_length))
                } else {
                    None
                }
            }
            Err(e) => {
                println!("Error {}", e);
                None
            }
        }
    } else {
        None
    }
});
Run Code Online (Sandbox Code Playgroud)

问题是f.read上面的异步闭包中不允许这样做,并出现以下错误:

let stream = stream::unfold(decrypted_init_length, |decrypted_length| async move {
    if decrypted_length < start + length {
        let mut encrypted_chunk = vec![0u8; encrypted_block_size];
        match f.read(&mut encrypted_chunk[..]) {
            Ok(size) => {
                if size > 0 {
                    let decrypted = my_decrypt_fn(&encrypted_chunk[..]);
                    let updated_decrypted_length = decrypted_length + decrypted.len();
                    Some((decrypted, updated_decrypted_length))
                } else {
                    None
                }
            }
            Err(e) => {
                println!("Error {}", e);
                None
            }
        }
    } else {
        None
    }
});
Run Code Online (Sandbox Code Playgroud)

我不想f在封闭物内部打开。有没有更好的方法来解决这个问题?我可以使用不同的板条箱或特征或方法(即不stream::unfold)。

use*_*732 5

我找到了一个解决方案:在这里使用async-stream板条箱。

stream::unfold对我不起作用的原因之一是async move闭包不允许访问mut外部变量,例如f文件句柄。

现在async-stream,我将代码更改为以下内容,并且它可以工作:(请注意yield此板条箱添加的内容)。

use async_stream::try_stream;

<snip>

    try_stream! {
        while decrypted_length < start + length {
            match f.read(&mut encrypted_chunk[..]) {
                Ok(size) => 
                    if size > 0 {
                        println!("read {} bytes", size);
                        let decrypted = my_decrypt_fn(&encrypted_chunk[..size], ..);
                        decrypted_length = decrypted_length + decrypted.len();
                        yield decrypted;
                    } else {
                        break
                    }
                Err(e) => {
                    println!("Error {}", e);
                    break
                }
            }
        }
    }
Run Code Online (Sandbox Code Playgroud)

更新:

我发现它async-stream有一些我不能忽视的局限性。我最终Stream直接实现并且不再使用async-stream. 现在我的代码如下所示:

pub struct DecryptFileStream {
    f: File,
    <other_fields>,
}

impl Stream for DecryptFileStream {
    type Item = io::Result<Vec<u8>>;

    fn poll_next(self: Pin<&mut Self>,
                  _cx: &mut Context<'_>) -> Poll<Option<io::Result<Vec<u8>>>> {
         // read the file `f` of self and business_logic
         // 
         if decrypted.len() > 0 {
             Poll::Ready(Some(Ok(decrypted)))
         } else {
             Poll::Ready(None)
         }
    }
}

//. then use the above stream: 

    let stream = DecryptFileStream::new(...);
    Response::new(Body::wrap_stream(stream))
Run Code Online (Sandbox Code Playgroud)