我可以通过在两个异步接收器上调用 select 来错过一个值吗?

ube*_*ben 6 rust async-await rust-tokio

是否有可能,如果一个任务发送到a另一个(同时)发送到b,通过取消剩余的未来来tokio::select!打开ab删除一个值?还是保证在下一次循环迭代时收到?

use tokio::sync::mpsc::Receiver;

async fn foo(mut a: Receiver<()>, mut b: Receiver<()>) {
    loop {
        tokio::select!{
            _ = a.recv() => {
                println!("A!");
            }
            _ = b.recv() => {
                println!("B!");
            }
        }
    }
}
Run Code Online (Sandbox Code Playgroud)

async在那种情况下,我的思绪无法绕过魔法背后真正发生的事情。

use*_*198 3

它似乎在任何地方的文档中都没有得到保证,但由于基于 rusts poll 的架构的工作方式,它可能适用于直接从通道读取。select 相当于以随机顺序轮询每个 future,直到其中一个就绪,或者如果没有就绪,则等待唤醒器收到信号,然后重复该过程。仅当成功轮询返回时,消息才会从通道中删除。成功的轮询会停止选择,因此其余通道将不会被触及。因此,下次循环发生时将轮询它们,然后返回消息。

然而,这是一种危险的方法,因为如果接收器被替换为返回一个比直接读取更复杂的 future 的东西,它可能会在读取后挂起,那么当这种情况发生时,您可能会丢失消息。因此,它可能应该被视为不起作用。更安全的方法是将 future 存储在可变变量中,并在它们触发时更新:

use tokio::sync::mpsc::Receiver;

async fn foo(mut a: Receiver<()>, mut b: Receiver<()>) {
    let mut a_fut = a.recv();
    let mut b_fut = b.recv();
    loop {
        tokio::select!{
            _ = a_fut => {
                println!("A!");
                a_fut = a.recv();
            }
            _ = b_fut => {
                println!("B!");
                b_fut = b.recv();
            }
        }
    }
}
Run Code Online (Sandbox Code Playgroud)

  • 这是真实的!但当前的 tokio 实现也适用于更简单的版本 - 我认为团队意识到更改它将是一个重大更改。另一种看待它的变体是说当前的“Receiver”实现了“Stream”——这要求轮询是无状态的。只要接收者保持“流”状态,原始代码就会继续工作。如果 `Receiver::recv()` 返回一个 `Future`,它的作用比 `Stream::next()` 多,那么它可能会中断 (2认同)