如何在不使用 tokio::spawn 的情况下循环运行多个 Tokio 异步任务?

TPR*_*eal 3 asynchronous rust rust-tokio

我制作了一个 LED 时钟,它还可以显示天气。我的程序在循环中执行几件不同的事情,每件事情都有不同的时间间隔:

  • 每 50 毫秒更新一次 LED,
  • 每 1 秒检查一次灯光级别(以调整亮度),
  • 每 10 分钟获取一次天气信息,
  • 实际上还有更多,但这无关紧要。

更新 LED 是最关键的:我不希望在获取天气等信息时延迟更新。这应该不是问题,因为获取天气主要是异步 HTTP 调用。

这是我的代码:

let mut measure_light_stream = tokio::time::interval(Duration::from_secs(1));
let mut update_weather_stream = tokio::time::interval(WEATHER_FETCH_INTERVAL);
let mut update_leds_stream = tokio::time::interval(UPDATE_LEDS_INTERVAL);
loop {
    tokio::select! {
      _ = measure_light_stream.tick() => {
        let light = lm.get_light();
        light_smooth.sp = light;
      },
      _ = update_weather_stream.tick() => {
        let fetched_weather = weather_service.get(&config).await;
        // Store the fetched weather for later access from the displaying function.
        weather_clock.weather = fetched_weather.clone();
      },
      _ = update_leds_stream.tick() => {
        // Some code here that actually sets the LEDs.
        // This code accesses the weather_clock, the light level etc.
      },
    }
}
Run Code Online (Sandbox Code Playgroud)

我意识到代码没有执行我想要的操作 - 获取天气会阻止循环的执行。我明白为什么 - 文档tokio::select!说一旦表达式update_weather_stream.tick()完成,其他分支就会被取消。

我该如何做到这一点,以便在网络上等待获取天气时,LED 仍会更新?我发现我可以用来tokio::spawn启动一个单独的非阻塞“线程”来获取天气,但是后来我遇到了weather_service不存在的问题Send,更不用说weather_clock不能在线程之间共享了。我不想要这种复杂化,我对所有在单个线程中运行的东西都满意,就像它一样select!

可重现的例子

use std::time::Duration;
use tokio::time::{interval, sleep};

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let mut slow_stream = interval(Duration::from_secs(3));
    let mut fast_stream = interval(Duration::from_millis(200));
    // Note how access to this data is straightforward, I do not want
    // this to get more complicated, e.g. care about threads and Send.
    let mut val = 1;
    loop {
        tokio::select! {
          _ = fast_stream.tick() => {
            println!(".{}", val);
          },
          _ = slow_stream.tick() => {
            println!("Starting slow operation...");
            // The problem: During this await the dots are not printed.
            sleep(Duration::from_secs(1)).await;
            val += 1;
            println!("...done");
          },
        }
    }
}
Run Code Online (Sandbox Code Playgroud)

ste*_*pan 7

您可以用于tokio::join!在同一任务中同时运行多个异步操作。

这是一个例子:

async fn measure_light(halt: &Cell<bool>) {
    while !halt.get() {
        let light = lm.get_light();
        // ....

        tokio::time::sleep(Duration::from_secs(1)).await;
    }
}

async fn blink_led(halt: &Cell<bool>) {
    while !halt.get() {
        // LED blinking code

        tokio::time::sleep(UPDATE_LEDS_INTERVAL).await;
    }
}

async fn poll_weather(halt: &Cell<bool>) {
    while !halt.get() {
        let weather = weather_service.get(&config).await;
        // ...

        tokio::time::sleep(WEATHER_FETCH_INTERVAL).await;
    }
}

// example on how to terminate execution
async fn terminate(halt: &Cell<bool>) {
    tokio::time::sleep(Duration::from_secs(10)).await;
    halt.set(true);
}

async fn main() {
    let halt = Cell::new(false);
    tokio::join!(
        measure_light(&halt),
        blink_led(&halt),
        poll_weather(&halt),
        terminate(&halt),
    );
}
Run Code Online (Sandbox Code Playgroud)

如果您正在使用tokio::TcpStreamIO 或其他非阻塞 IO,那么它应该允许并发执行。

Cell作为示例,我添加了一个用于停止执行的标志。您可以使用相同的技术在连接分支之间共享任何可变状态。


编辑:同样的事情可以用 完成tokio::select!。与您的代码的主要区别在于,实际的“业务逻辑”位于 等待的 future 内select

select允许您删除未完成的 future,而不是等待它们自行退出(因此halt不需要终止标志)。

async fn main() {
    tokio::select! {
        _ = measure_light() => {},
        _ = blink_led() = {},
        _ = poll_weather() => {},
    }
}
Run Code Online (Sandbox Code Playgroud)