TPR*_*eal 3 asynchronous rust rust-tokio
我制作了一个 LED 时钟,它还可以显示天气。我的程序在循环中执行几件不同的事情,每件事情都有不同的时间间隔:
更新 LED 是最关键的:我不希望在获取天气等信息时延迟更新。这应该不是问题,因为获取天气主要是异步 HTTP 调用。
这是我的代码:
let mut measure_light_stream = tokio::time::interval(Duration::from_secs(1));
let mut update_weather_stream = tokio::time::interval(WEATHER_FETCH_INTERVAL);
let mut update_leds_stream = tokio::time::interval(UPDATE_LEDS_INTERVAL);
loop {
tokio::select! {
_ = measure_light_stream.tick() => {
let light = lm.get_light();
light_smooth.sp = light;
},
_ = update_weather_stream.tick() => {
let fetched_weather = weather_service.get(&config).await;
// Store the fetched weather for later access from the displaying function.
weather_clock.weather = fetched_weather.clone();
},
_ = update_leds_stream.tick() => {
// Some code here that actually sets the LEDs.
// This code accesses the weather_clock, the light level etc.
},
}
}
Run Code Online (Sandbox Code Playgroud)
我意识到代码没有执行我想要的操作 - 获取天气会阻止循环的执行。我明白为什么 - 文档tokio::select!说一旦表达式update_weather_stream.tick()完成,其他分支就会被取消。
我该如何做到这一点,以便在网络上等待获取天气时,LED 仍会更新?我发现我可以用来tokio::spawn启动一个单独的非阻塞“线程”来获取天气,但是后来我遇到了weather_service不存在的问题Send,更不用说weather_clock不能在线程之间共享了。我不想要这种复杂化,我对所有在单个线程中运行的东西都满意,就像它一样select!。
use std::time::Duration;
use tokio::time::{interval, sleep};
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let mut slow_stream = interval(Duration::from_secs(3));
let mut fast_stream = interval(Duration::from_millis(200));
// Note how access to this data is straightforward, I do not want
// this to get more complicated, e.g. care about threads and Send.
let mut val = 1;
loop {
tokio::select! {
_ = fast_stream.tick() => {
println!(".{}", val);
},
_ = slow_stream.tick() => {
println!("Starting slow operation...");
// The problem: During this await the dots are not printed.
sleep(Duration::from_secs(1)).await;
val += 1;
println!("...done");
},
}
}
}
Run Code Online (Sandbox Code Playgroud)
您可以用于tokio::join!在同一任务中同时运行多个异步操作。
这是一个例子:
async fn measure_light(halt: &Cell<bool>) {
while !halt.get() {
let light = lm.get_light();
// ....
tokio::time::sleep(Duration::from_secs(1)).await;
}
}
async fn blink_led(halt: &Cell<bool>) {
while !halt.get() {
// LED blinking code
tokio::time::sleep(UPDATE_LEDS_INTERVAL).await;
}
}
async fn poll_weather(halt: &Cell<bool>) {
while !halt.get() {
let weather = weather_service.get(&config).await;
// ...
tokio::time::sleep(WEATHER_FETCH_INTERVAL).await;
}
}
// example on how to terminate execution
async fn terminate(halt: &Cell<bool>) {
tokio::time::sleep(Duration::from_secs(10)).await;
halt.set(true);
}
async fn main() {
let halt = Cell::new(false);
tokio::join!(
measure_light(&halt),
blink_led(&halt),
poll_weather(&halt),
terminate(&halt),
);
}
Run Code Online (Sandbox Code Playgroud)
如果您正在使用tokio::TcpStreamIO 或其他非阻塞 IO,那么它应该允许并发执行。
Cell作为示例,我添加了一个用于停止执行的标志。您可以使用相同的技术在连接分支之间共享任何可变状态。
编辑:同样的事情可以用 完成tokio::select!。与您的代码的主要区别在于,实际的“业务逻辑”位于 等待的 future 内select。
select允许您删除未完成的 future,而不是等待它们自行退出(因此halt不需要终止标志)。
async fn main() {
tokio::select! {
_ = measure_light() => {},
_ = blink_led() = {},
_ = poll_weather() => {},
}
}
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
9806 次 |
| 最近记录: |