如何使 tokio 流同时工作

Mat*_*ric -1 asynchronous rust rust-tokio

我想运行一系列流,其中多个流进程同时运行。我尝试的测试代码如下:

use tokio;
use tokio_stream::{self as stream, StreamExt};
use std::time::Duration;

#[tokio::main]
async fn main() {
    let stream = stream::iter(0..10)
        .then(|i| async move {
            tokio::time::sleep(Duration::from_secs(1)).await;
            println!("i={:?}", i);
        })
        .chunks_timeout(3, Duration::from_secs(20));

    let _result : Vec<_> = stream
        .collect().await;
}
Run Code Online (Sandbox Code Playgroud)

此代码运行,但它会一一打印 10 个值,并有 1 秒的延迟。这与并发相反。我期望的是等待 3 秒,打印 3 个数字,然后等待 1 秒,依此类推。我认为这tokio::time::sleep很好,因为join_all我让代码同时工作。

那么,如何解释缺乏并发性以及如何解决呢?

kmd*_*eko 5

流只是异步迭代器并且是​​惰性的,除非您要求,否则它们不包含额外的每个元素并发性。

tokio-stream的流媒体功能与futures crate(可能更常见)的流媒体功能不同。他们都使用相同的潜在Stream特征。差异在于各自的StreamExt特质;来自 future 板条箱的功能更全面、更通用,而来自 tokio-stream 的则更精简并且面向 tokio(尤其是时间安排)。

您将需要使用futures 箱中的.buffered()/ buffered_unordered(),因为这些方法允许流一次缓冲多个待处理的 future(从而允许它们同时运行),但您仍然需要.chunks_timeout()from tokio-stream。不幸的是,你不能在范围内同时拥有这两个特征,因为许多调用如.then()or.map()是不明确的;最好的做法是遵循 tokio-stream 的建议,并为其中之一使用完全限定的语法。

这是一个完整的示例(由于工作方式而从 更改.then()为):.map().buffered()

use futures::stream::{self, StreamExt};
use std::time::Duration;
use tokio;

#[tokio::main]
async fn main() {
    let stream = stream::iter(0..10)
        .map(|i| async move {
            tokio::time::sleep(Duration::from_secs(1)).await;
            println!("i={:?}", i);
        })
        .buffered(5);

    let stream = tokio_stream::StreamExt::chunks_timeout(stream, 3, Duration::from_secs(20));

    let _result: Vec<_> = stream.collect().await;
}
Run Code Online (Sandbox Code Playgroud)

该代码将在大约两秒内完成,因为它将同时运行十个睡眠中的五个。