我正在阅读这个博客。提供了一些代码:
async fn parse_line(socket: &TcpStream) -> Result<String, Error> {
let len = socket.read_u32().await?;
let mut line = vec![0; len];
socket.read_exact(&mut line).await?;
let line = str::from_utf8(line)?;
Ok(line)
}
loop {
select! {
line_in = parse_line(&socket) => {
if let Some(line_in) = line_in {
broadcast_line(line_in);
} else {
// connection closed, exit loop
break;
}
}
line_out = channel.recv() => {
write_line(&socket, line_out).await;
}
}
}
Run Code Online (Sandbox Code Playgroud)
作者声称,如果在执行 parse_line 时收到消息,parse_line则最终可能会处于损坏状态。channel
什么时候可以parse_line中断?是在任何时候吗?根据我目前的理解(这可能是错误的),Rust 可以在等待语句中切换线程上的任务,但在这些点上状态会被存储,以便可以恢复工作。
我想象在 中parse_line,Rust 正在将字节加载到line变量中。读取一定数量的字节(并且对于某些 ASCII 字符可能只有一半字节)并在等待更多字节进入时,channel接收一些内容并进行上下文切换。
完成channel.recv()任务后,Rust 返回读取输入,但是提供字节的用户取消了请求,现在没有其他内容可读取。
由于ASCII 字符不完整,现在str::from_utf8(line)?会引发 UTF-8 错误。line
TL;DR:不是在任何时候,只是在.awaits 处。
异步代码被降低到实现Future. 循环.await调用,当调用者返回时挂起并在 上结束。这实质上是运行内在未来直至完成。Future::poll()Poll::PendingPoll::Ready
poll()但是,如果我们调用几次然后停止而没有完成 future会发生什么?在这种情况下,我们调用的几次poll()已经将 future 推进到执行的某个点,然后我们就停止了。我们已经取消了未来。
关键的观察是这个执行点必须在内部.await。这是因为.await未来的同步、无代码在poll()实现中会转化为直接同步代码,而我们不能就此止步。我们只能在从 回来之后停下来poll(),这要么发生在中间.await,要么发生在完成时。
然而,并非所有期货都可以安全取消。有些人,比如你的parse_line(),在取消后就会失去工作。如果我们在第二个取消.await,则长度已从套接字读取(并丢弃),但主体尚未读取,因此我们会丢失该长度。我们无法恢复它,并且下次调用该函数时,它将看到来自套接字的损坏数据(或者只是跳过一条记录)。
select!取消除第一个完成的期货之外的所有期货,因此此代码有一个错误。
解决办法是永远不要失去未来,但要保留它以供以后使用:
let mut parse_line_fut = std::pin::pin!(parse_line(&socket));
loop {
select! {
line_in = parse_line_fut.as_mut() => {
if let Ok(line_in) = line_in {
broadcast_line(line_in);
parse_line_fut.set(parse_line(&socket));
} else {
// connection closed, exit loop
break;
}
}
line_out = channel.recv() => {
write_line(&socket, line_out).await;
}
}
}
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
42 次 |
| 最近记录: |