如何在 Rust 中将字节迭代器转换为流

Ara*_*adi 5 rust async-await

我试图找出构建一个需要将文件内容读入的功能,futures::stream::BoxStream但我很难弄清楚我需要做什么。

我已经弄清楚如何通过实现迭代器的 Bytes 逐字节读取文件。

use std::fs::File;
use std::io::prelude::*;
use std::io::{BufReader, Bytes};

// TODO: Convert this to a async Stream
fn async_read() -> Box<dyn Iterator<Item = Result<u8, std::io::Error>>> {
    let f = File::open("/dev/random").expect("Could not open file");
    let reader = BufReader::new(f);

    let iter = reader.bytes().into_iter();
    Box::new(iter)
}

fn main() {
    ctrlc::set_handler(move || {
        println!("received Ctrl+C!");
        std::process::exit(0);
    })
    .expect("Error setting Ctrl-C handler");

    for b in async_read().into_iter() {
        println!("{:?}", b);
    }
}
Run Code Online (Sandbox Code Playgroud)

但是,我一直在苦苦挣扎,试图弄清楚如何将其Box<dyn Iterator<Item = Result<u8, std::io::Error>>>转换为Stream.

我会认为这样的事情会起作用:

use futures::stream;
use std::fs::File;
use std::io::prelude::*;
use std::io::{BufReader, Bytes};

// TODO: Convert this to a async Stream
fn async_read() -> stream::BoxStream<'static, dyn Iterator<Item = Result<u8, std::io::Error>>> {
    let f = File::open("/dev/random").expect("Could not open file");
    let reader = BufReader::new(f);

    let iter = reader.bytes().into_iter();
    std::pin::Pin::new(Box::new(stream::iter(iter)))
}

fn main() {
    ctrlc::set_handler(move || {
        println!("received Ctrl+C!");
        std::process::exit(0);
    })
    .expect("Error setting Ctrl-C handler");

    while let Some(b) = async_read().poll() {
        println!("{:?}", b);
    }
}
Run Code Online (Sandbox Code Playgroud)

但是我不断收到大量编译器错误,我尝试过其他排列但通常无处可去。

编译器错误之一:

std::pin::Pin::new
``` --> src/main.rs:14:24
   |
14 |     std::pin::Pin::new(Box::new(stream::iter(iter)))
   |                        ^^^^^^^^^^^^^^^^^^^^^^^^^^^^ expected trait object `dyn std::iter::Iterator`, found enum `std::result::Result`
Run Code Online (Sandbox Code Playgroud)

有人有什么建议吗?

我对 Rust 很陌生,特别是 Streams/低级的东西,所以如果我有任何错误,我很抱歉,请随时纠正我。

对于一些额外的背景,我正在尝试这样做,以便您可以CTRL-C使用 nushell 中的命令

Cod*_*256 6

我认为你有点过于复杂了,你可以直接impl Stream从返回async_read,不需要装箱或固定(原始Iterator版本也是如此)。然后,您需要设置一个异步运行时以轮询流(在本例中,我只使用 提供的运行时futures::executor::block_on)。然后你可以调用futures::stream::StreamExt::next()流来获取代表下一个项目的未来。

这是执行此操作的一种方法:

use futures::prelude::*;
use std::{
    fs::File,
    io::{prelude::*, BufReader},
};

fn async_read() -> impl Stream<Item = Result<u8, std::io::Error>> {
    let f = File::open("/dev/random").expect("Could not open file");
    let reader = BufReader::new(f);
    stream::iter(reader.bytes())
}

async fn async_main() {
    while let Some(b) = async_read().next().await {
        println!("{:?}", b);
    }
}

fn main() {
    ctrlc::set_handler(move || {
        println!("received Ctrl+C!");
        std::process::exit(0);
    })
    .expect("Error setting Ctrl-C handler");

    futures::executor::block_on(async_main());
}
Run Code Online (Sandbox Code Playgroud)

  • 你是一个美丽的人,我爱你 (2认同)