Mat*_*ric -1 asynchronous rust rust-tokio
我想运行一系列流,其中多个流进程同时运行。我尝试的测试代码如下:
use tokio;
use tokio_stream::{self as stream, StreamExt};
use std::time::Duration;
#[tokio::main]
async fn main() {
let stream = stream::iter(0..10)
.then(|i| async move {
tokio::time::sleep(Duration::from_secs(1)).await;
println!("i={:?}", i);
})
.chunks_timeout(3, Duration::from_secs(20));
let _result : Vec<_> = stream
.collect().await;
}
Run Code Online (Sandbox Code Playgroud)
此代码运行,但它会一一打印 10 个值,并有 1 秒的延迟。这与并发相反。我期望的是等待 3 秒,打印 3 个数字,然后等待 1 秒,依此类推。我认为这tokio::time::sleep很好,因为join_all我让代码同时工作。
那么,如何解释缺乏并发性以及如何解决呢?
流只是异步迭代器并且是惰性的,除非您要求,否则它们不包含额外的每个元素并发性。
tokio-stream的流媒体功能与futures crate(可能更常见)的流媒体功能不同。他们都使用相同的潜在Stream特征。差异在于各自的StreamExt特质;来自 future 板条箱的功能更全面、更通用,而来自 tokio-stream 的则更精简并且面向 tokio(尤其是时间安排)。
您将需要使用futures 箱中的.buffered()/ buffered_unordered(),因为这些方法允许流一次缓冲多个待处理的 future(从而允许它们同时运行),但您仍然需要.chunks_timeout()from tokio-stream。不幸的是,你不能在范围内同时拥有这两个特征,因为许多调用如.then()or.map()是不明确的;最好的做法是遵循 tokio-stream 的建议,并为其中之一使用完全限定的语法。
这是一个完整的示例(由于工作方式而从 更改.then()为):.map().buffered()
use futures::stream::{self, StreamExt};
use std::time::Duration;
use tokio;
#[tokio::main]
async fn main() {
let stream = stream::iter(0..10)
.map(|i| async move {
tokio::time::sleep(Duration::from_secs(1)).await;
println!("i={:?}", i);
})
.buffered(5);
let stream = tokio_stream::StreamExt::chunks_timeout(stream, 3, Duration::from_secs(20));
let _result: Vec<_> = stream.collect().await;
}
Run Code Online (Sandbox Code Playgroud)
该代码将在大约两秒内完成,因为它将同时运行十个睡眠中的五个。
| 归档时间: |
|
| 查看次数: |
1194 次 |
| 最近记录: |