Rid*_*del 4 asynchronous rust async-await
在我的RSS 阅读器项目中,我想异步阅读我的 RSS 提要。目前,由于此代码块,它们被同步读取
self.feeds = self
.feeds
.iter()
.map(|f| f.read(&self.settings))
.collect::<Vec<Feed>>();
Run Code Online (Sandbox Code Playgroud)
我想让该代码异步,因为它可以让我更好地处理糟糕的 Web 服务器响应。
我知道我可以使用Stream我可以从Vec使用stream::from_iter(...)中创建的将代码转换为类似的东西
self.feeds = stream::from_iter(self.feeds.iter())
.map(|f| f.read(&self.settings))
// ???
.collect::<Vec<Feed>>()
}
Run Code Online (Sandbox Code Playgroud)
但是,我有两个问题
Vec(这是一个同步结构)?task::spawn但它似乎不起作用......如何执行该流?我正在考虑使用
task::spawn但它似乎不起作用
在async/await世界上,异步代码旨在由执行程序执行,该执行程序不是标准库的一部分,而是由第三方 crate 提供,例如tokio. task::spawn只安排一个实例async fn运行,而不是实际运行它。
如何将结果加入 vec(这是一个同步结构)
你的 RSS 阅读器的面包和黄油似乎是f.read. 它应该变成一个异步函数。然后将提要向量映射到期货向量,需要轮询完成。
该futures箱具有futures::stream::futures_unordered::FuturesUnordered帮助你做到这一点。FuturesUnordered本身实现了Streamtrait。这个流然后被收集到结果向量中并await像这样完成:
//# tokio = { version = "0.2.4", features = ["full"] }
//# futures = "0.3.1"
use tokio::time::delay_for;
use futures::stream::StreamExt;
use futures::stream::futures_unordered::FuturesUnordered;
use std::error::Error;
use std::time::{Duration, Instant};
#[tokio::main]
async fn main() -> Result<(), Box<dyn Error>> {
let start = Instant::now();
let feeds = (0..10).collect::<Vec<_>>();
let res = read_feeds(feeds).await;
dbg!(res);
dbg!(start.elapsed());
Ok(())
}
async fn read_feeds(feeds: Vec<u32>) -> Vec<u32> {
feeds.iter()
.map(read_feed)
.collect::<FuturesUnordered<_>>()
.collect::<Vec<_>>()
.await
}
async fn read_feed(feed: &u32) -> u32 {
delay_for(Duration::from_millis(500)).await;
feed * 2
}
Run Code Online (Sandbox Code Playgroud)
delay_for是模拟潜在的昂贵操作。它还有助于证明这些读数确实在没有任何显式线程相关逻辑的情况下同时发生。
这里有一个细微差别。与其同步对应物不同,读取 rss 提要的结果不再与提要本身的顺序相同,以第一个返回者为准。你需要以某种方式处理它。