有没有办法创建一个异步流生成器来产生重复调用函数的结果?

pek*_*u33 3 asynchronous rust async-await rust-tokio

我想构建一个收集天气更新并将它们表示为流的程序。我想get_weather()在一个无限循环中调用,在finishstart之间有 60 秒的延迟。

简化版本如下所示:

async fn get_weather() -> Weather { /* ... */ }

fn get_weather_stream() -> impl futures::Stream<Item = Weather> {
    loop {
        tokio::timer::delay_for(std::time::Duration::from_secs(60)).await;
        let weather = get_weather().await;
        yield weather; // This is not supported
        // Note: waiting for get_weather() stops the timer and avoids overflows.
    }
}
Run Code Online (Sandbox Code Playgroud)

有没有办法轻松做到这一点?

超过 60 秒tokio::timer::Interval时使用将不起作用get_weather()

fn get_weather_stream() -> impl futures::Stream<Item = Weather> {
    tokio::timer::Interval::new_with_delay(std::time::Duration::from_secs(60))
        .then(|| get_weather())
}
Run Code Online (Sandbox Code Playgroud)

如果发生这种情况,下一个功能将立即启动。我想在上一次get_weather()开始和下一次get_weather()开始之间保持 60 秒。

She*_*ter 5

用于stream::unfold从“未来世界”到“流世界”。我们不需要任何额外的状态,所以我们使用空元组:

use futures::StreamExt; // 0.3.4
use std::time::Duration;
use tokio::time; // 0.2.11

struct Weather;

async fn get_weather() -> Weather {
    Weather
}

const BETWEEN: Duration = Duration::from_secs(1);

fn get_weather_stream() -> impl futures::Stream<Item = Weather> {
    futures::stream::unfold((), |_| async {
        time::delay_for(BETWEEN).await;
        let weather = get_weather().await;
        Some((weather, ()))
    })
}

#[tokio::main]
async fn main() {
    get_weather_stream()
        .take(3)
        .for_each(|_v| async {
            println!("Got the weather");
        })
        .await;
}
Run Code Online (Sandbox Code Playgroud)
use futures::StreamExt; // 0.3.4
use std::time::Duration;
use tokio::time; // 0.2.11

struct Weather;

async fn get_weather() -> Weather {
    Weather
}

const BETWEEN: Duration = Duration::from_secs(1);

fn get_weather_stream() -> impl futures::Stream<Item = Weather> {
    futures::stream::unfold((), |_| async {
        time::delay_for(BETWEEN).await;
        let weather = get_weather().await;
        Some((weather, ()))
    })
}

#[tokio::main]
async fn main() {
    get_weather_stream()
        .take(3)
        .for_each(|_v| async {
            println!("Got the weather");
        })
        .await;
}
Run Code Online (Sandbox Code Playgroud)

也可以看看: