Rust 中的异步操作什么时候可以退出?

fin*_*ian 3 rust rust-tokio

语境

我正在阅读这个博客。提供了一些代码:

async fn parse_line(socket: &TcpStream) -> Result<String, Error> {
    let len = socket.read_u32().await?;
    let mut line = vec![0; len];
    socket.read_exact(&mut line).await?;
    let line = str::from_utf8(line)?;
    Ok(line)
}

loop {
    select! {
        line_in = parse_line(&socket) => {
            if let Some(line_in) = line_in {
                broadcast_line(line_in);
            } else {
                // connection closed, exit loop
                break;
            }
        }
        line_out = channel.recv() => {
            write_line(&socket, line_out).await;
        }
    }
}
Run Code Online (Sandbox Code Playgroud)

作者声称,如果在执行 parse_line 时收到消息,parse_line则最终可能会处于损坏状态。channel

问题

什么时候可以parse_line中断?是在任何时候吗?根据我目前的理解(这可能是错误的),Rust 可以在等待语句中切换线程上的任务,但在这些点上状态会被存储,以便可以恢复工作。

我的想象

我想象在 中parse_line,Rust 正在将字节加载到line变量中。读取一定数量的字节(并且对于某些 ASCII 字符可能只有一半字节)并在等待更多字节进入时,channel接收一些内容并进行上下文切换。

完成channel.recv()任务后,Rust 返回读取输入,但是提供字节的用户取消了请求,现在没有其他内容可读取。

由于ASCII 字符不完整,现在str::from_utf8(line)?会引发 UTF-8 错误。line

Cha*_*man 6

TL;DR:不是在任何时候,只是在.awaits 处。


异步代码被降低到实现Future. 循环.await调用,当调用者返回时挂起并在 上结束。这实质上是运行内在未来直至完成。Future::poll()Poll::PendingPoll::Ready

poll()但是,如果我们调用几次然后停止而没有完成 future会发生什么?在这种情况下,我们调用的几次poll()已经将 future 推进到执行的某个点,然后我们就停止了。我们已经取消了未来。

关键的观察是这个执行点必须在内部.await。这是因为.await未来的同步、无代码在poll()实现中会转化为直接同步代码,而我们不能就此止步。我们只能在从 回来之后停下来poll(),这要么发生在中间.await,要么发生在完成时。

然而,并非所有期货都可以安全取消。有些人,比如你的parse_line(),在取消后就会失去工作。如果我们在第二个取消.await,则长度已从套接字读取(并丢弃),但主体尚未读取,因此我们会丢失该长度。我们无法恢复它,并且下次调用该函数时,它将看到来自套接字的损坏数据(或者只是跳过一条记录)。

select!取消除第一个完成的期货之外的所有期货,因此此代码有一个错误。

解决办法是永远不要失去未来,但要保留它以供以后使用:

let mut parse_line_fut = std::pin::pin!(parse_line(&socket));
loop {
    select! {
        line_in = parse_line_fut.as_mut() => {
            if let Ok(line_in) = line_in {
                broadcast_line(line_in);

                parse_line_fut.set(parse_line(&socket));
            } else {
                // connection closed, exit loop
                break;
            }
        }
        line_out = channel.recv() => {
            write_line(&socket, line_out).await;
        }
    }
}
Run Code Online (Sandbox Code Playgroud)

  • @financial_physical 该值已从流中读取,并且不再存在。该变量被删除。该值不存储在任何地方。 (2认同)