我试图使用futures::future::select_ok(游乐场):
use std::time::Duration;
use tokio; // 1.16.1
use futures; // 0.3.19
#[tokio::main]
async fn main() {
let first_future = async {
tokio::time::sleep(Duration::from_secs(1)).await;
Ok(3)
};
let second_future = async {
tokio::time::sleep(Duration::from_millis(100)).await;
Err(())
};
let third_future = async {
tokio::time::sleep(Duration::from_secs(300)).await;
Ok(3)
};
futures::future::select_ok(&[first_future,second_future.await,third_future]).await;
}
Run Code Online (Sandbox Code Playgroud)
并遇到错误:
error[E0308]: mismatched types
--> src/main.rs:19:47
|
7 | let first_future = async {
| ______________________________-
8 | | tokio::time::sleep(Duration::from_secs(1)).await;
9 | | Ok(3)
10 | | };
| |_____- the expected `async` block …Run Code Online (Sandbox Code Playgroud) 我制作了一个 LED 时钟,它还可以显示天气。我的程序在循环中执行几件不同的事情,每件事情都有不同的时间间隔:
更新 LED 是最关键的:我不希望在获取天气等信息时延迟更新。这应该不是问题,因为获取天气主要是异步 HTTP 调用。
这是我的代码:
let mut measure_light_stream = tokio::time::interval(Duration::from_secs(1));
let mut update_weather_stream = tokio::time::interval(WEATHER_FETCH_INTERVAL);
let mut update_leds_stream = tokio::time::interval(UPDATE_LEDS_INTERVAL);
loop {
tokio::select! {
_ = measure_light_stream.tick() => {
let light = lm.get_light();
light_smooth.sp = light;
},
_ = update_weather_stream.tick() => {
let fetched_weather = weather_service.get(&config).await;
// Store the fetched weather for later access from the displaying function.
weather_clock.weather = fetched_weather.clone();
},
_ = update_leds_stream.tick() …Run Code Online (Sandbox Code Playgroud) 我正在测试主线程终止时使用 tokio::spawn 创建的任务的行为方式。根据tokio 文档,当运行时关闭时,所有任务都会被删除。
无法保证生成的任务将执行完成。当运行时关闭时,所有未完成的任务都将被删除,无论该任务的生命周期如何。
然而,当测试如下运行时,即使运行时终止,任务似乎也没有终止。
[package]
name = "tokio_test"
version = "0.1.0"
edition = "2021"
[dependencies]
tokio = { version = "1", features = ["rt", "test-util", "macros", "rt-multi-thread"]}
Run Code Online (Sandbox Code Playgroud)
use std::{thread, time};
#[tokio::main]
async fn main() {
println!("tokio test - main thread...");
tokio::spawn(async move {
let mut i = 0;
loop {
println!(
"tokio test - tokio spawn(worker) thread... sleep(1): {}/10",
i
);
thread::sleep(time::Duration::from_millis(1000));
i += 1;
if i > 3 {
break;
}
}
}); …Run Code Online (Sandbox Code Playgroud) 我想捕获 Rust 跟踪中跨度的执行持续时间并将其作为指标发送。
我发现 fmt() 有助于打印这里提到的内容:How can I log spanuration with Rust Tracing?
我还尝试过这个关于创建层和实现 on_new_span() 和 on_event() 的示例。我还添加了 on_close() 来检查我们在这里获得了哪些元数据。我写的代码是:
use tracing::{info, info_span};
use tracing_subscriber::prelude::*;
mod custom_layer;
use custom_layer::CustomLayer;
fn main() {
tracing_subscriber::registry()
.with(CustomLayer)
.init();
let outer_span = info_span!("Outer", level = 0, other_field = tracing::field::Empty);
let _outer_entered = outer_span.enter();
outer_span.record("other_field", &7);
let inner_span = info_span!("inner", level = 1);
let _inner_entered = inner_span.enter();
info!(a_bool = true, answer = 42, message = "first example");
}
Run Code Online (Sandbox Code Playgroud)
自定义层.rs:
use std::collections::BTreeMap;
use …Run Code Online (Sandbox Code Playgroud) 我是异步编程的新手,因此努力解决不同方法的行为差异。
考虑 tokio 在 github 存储库 chat.rs 中给出的示例:
// snip
loop {
tokio::select! {
// A message was received from a peer. Send it to the current user.
Some(msg) = peer.rx.recv() => {
// do something
}
result = peer.lines.next() => match result {
// A message was received from the current user, we should
// broadcast this message to the other users.
Some(Ok(msg)) => {
// do something
}
Some(Err(e)) => {
// handle error
}
// The stream …Run Code Online (Sandbox Code Playgroud) 在这个 Tokio 教程中,它有代码:
tokio::spawn(async move {
process(socket).await;
});
Run Code Online (Sandbox Code Playgroud)
我不明白为什么async move {}这里使用块。据我所知,到目前为止这是没有必要的。下面的方法就可以了,而且更加简洁和直接:
tokio::spawn(process(socket));
Run Code Online (Sandbox Code Playgroud)
所以,我的问题是
async需要该块?async添加额外的间接层,可能会导致性能略有下降?这个简单的程序会产生编译器错误:
#[tokio::main]
async fn main() {
tokio::spawn(async {
foo().await;
});
}
async fn foo() {
let f1 = bar();
let f2 = bar();
tokio::join!(f1, f2);
}
async fn bar() -> Result<(), Box<dyn std::error::Error>> {
println!("Hello world");
Ok(())
}
Run Code Online (Sandbox Code Playgroud)
error[E0277]: `(dyn std::error::Error + 'static)` cannot be sent between threads safely
--> src/main.rs:5:18
|
5 | tokio::spawn(async {
| _____------------_^
| | |
| | required by a bound introduced by this call
6 | | foo().await;
7 | | });
| …Run Code Online (Sandbox Code Playgroud) 我写了如下代码,可以编译成功,
#[tokio::main]
async fn main() {
}
Run Code Online (Sandbox Code Playgroud)
但我很好奇为什么mainproc_macro 可以直接使用而不需要任何前奏或显式的 use 语句?
我正在阅读这个博客。提供了一些代码:
async fn parse_line(socket: &TcpStream) -> Result<String, Error> {
let len = socket.read_u32().await?;
let mut line = vec![0; len];
socket.read_exact(&mut line).await?;
let line = str::from_utf8(line)?;
Ok(line)
}
loop {
select! {
line_in = parse_line(&socket) => {
if let Some(line_in) = line_in {
broadcast_line(line_in);
} else {
// connection closed, exit loop
break;
}
}
line_out = channel.recv() => {
write_line(&socket, line_out).await;
}
}
}
Run Code Online (Sandbox Code Playgroud)
作者声称,如果在执行 parse_line 时收到消息,parse_line则最终可能会处于损坏状态。channel
什么时候可以parse_line中断?是在任何时候吗?根据我目前的理解(这可能是错误的),Rust 可以在等待语句中切换线程上的任务,但在这些点上状态会被存储,以便可以恢复工作。 …
有没有办法使用#[tokio::main]tokio 0.2 中的属性指定单线程运行时?该文档似乎没有这方面的示例。
编辑:我想找到一种方法来设置 tokio 运行时,以便rustc知道tokio:spawn()不会是一个新线程。