bru*_*uan 4 rust rust-tokio tokio
想象一下,一些 future 存储在一个Vec长度由运行时决定的 a 中,你应该同时加入这些 future,你应该怎么做?
显然,通过文档中的示例tokio::join,手动指定每个长度Vec可能是,如 1,2,3,... 并处理可观的案例应该工作。
extern crate tokio;
let v = Vec::new();
v.push(future_1);
// directly or indirectly you push many futures to the vector
v.push(future_N);
// to join these futures concurrently one possible way is
if v.len() == 0 {}
if v.len() == 1 { join!(v.pop()); }
if v.len() == 2 { join!(v.pop(), v.pop() ); }
// ...
Run Code Online (Sandbox Code Playgroud)
我还注意到 tokio::join! 当我使用类似的语法时,将列表作为文档中的参数
tokio::join!(v);
Run Code Online (Sandbox Code Playgroud)
或类似的东西
tokio::join![ v ] / tokio::join![ v[..] ] / tokio::join![ v[..][..] ]
Run Code Online (Sandbox Code Playgroud)
它只是不起作用
那么问题来了,是否有任何途径可以更有效地加入这些期货,或者我是否应该错过与文件所说的内容相反的内容?
mza*_*uev 29
join_all和try_join_all,以及同一个箱子中的更多功能FuturesOrdered和实用程序,被作为一个组合的 future 进行轮询。如果组成的 future 很简单并且通常不会同时准备好执行工作,那么这可能没问题,但以这种方式组合 future 存在两个潜在问题。首先,您将无法在多线程运行时中利用 CPU 并行性。其次,如果 future 使用共享同步原语,则可能会出现死锁。如果确实出现这些问题,请考虑将各个 future 作为单独的任务生成,并等待任务完成。FuturesUnorderedfutures
通过最近的 Tokio 版本,您可以使用JoinSet以获得最大的灵活性,包括中止所有任务的能力。JoinSet删除时,集中的任务也会中止。
use tokio::task::JoinSet;
let mut set = JoinSet::new();
for fut in v {
set.spawn(fut);
}
while let Some(res) = set.join_next().await {
let out = res?;
// ...
}
Run Code Online (Sandbox Code Playgroud)
使用连接句柄生成任务tokio::spawn并等待连接句柄:
use futures::future;
// ...
let outputs = future::try_join_all(v.into_iter().map(tokio::spawn)).await?;
Run Code Online (Sandbox Code Playgroud)
您还可以使用FuturesOrdered和FuturesUnordered组合器在流中异步处理输出:
use futures::stream::FuturesUnordered;
use futures::prelude::*;
// ...
let mut completion_stream = v.into_iter()
.map(tokio::spawn)
.collect::<FuturesUnordered<_>>();
while let Some(res) = completion_stream.next().await {
// ...
}
Run Code Online (Sandbox Code Playgroud)
以这种方式等待任务的一个警告是,当生成任务并可能拥有返回值的 future(例如异步块)被删除时,任务不会被取消JoinHandle。JoinHandle::abort需要使用该方法显式取消任务。
一个完整的例子:
#[tokio::main]
async fn main() {
let tasks = (0..5).map(|i| tokio::spawn(async move {
sleep(Duration::from_secs(1)).await; // simulate some work
i * 2
})).collect::<FuturesUnordered<_>>();
let result = futures::future::join_all(tasks).await;
println!("{:?}", result); // [Ok(8), Ok(6), Ok(4), Ok(2), Ok(0)]
}
Run Code Online (Sandbox Code Playgroud)